大家好,又见面了,我是你们的朋友全栈君。
参考视频 https://www.iqiyi.com/v_19rr1myxr8.html
重点 看3.0从数据库中读
整体的框架
注意:
- 再批处理数据库的时候一定要注意大字段CLOB的问题和时间的问题。
- 如果遇到大字段的时候需要把获取的大字段转为String再存入数据库下面给出实例代码。
// Clob类型 转String
public static String ClobToString(Clob clob) throws SQLException, IOException {
String reString = "";
Reader is = clob.getCharacterStream();// 得到流
BufferedReader br = new BufferedReader(is);
String s = br.readLine();
StringBuffer sb = new StringBuffer();
while (s != null) {// 执行循环将字符串全部取出付值给StringBuffer由StringBuffer转成STRING
sb.append(s);
s = br.readLine();
}
reString = sb.toString();
if(br!=null){
br.close();
}
if(is!=null){
is.close();
}
return reString;
}
1.搭建springbatch的框架
1.必须导入的依赖。因为spirngbatch必须配置数据源dataSource所有引入了数据库的相关jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!--必须配置数据源-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
2.相关的配置文件application.properties。最后两行很重要因为spirngbatch会依赖一些表,最后一行的配置是执行倒数第二行的sql语句。当你再执行启动类的时候就会再数据库中默认创建哪些必须的表。
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/springbatch?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
#下免的配置是再初始化的时候会执行上面的schema-mysql.sql中的表
spring.batch.initialize-schema=always
2.最简单的例子,再加载启动类的时候会加载他
2.1最简单的一个step
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
//注入创建任务对象的对象
@Autowired
private JobBuilderFactory jobBuilderFactory;
//任务的执行由setp决定
//注入创建setp对象的对象
@Autowired
private StepBuilderFactory stepBuilderFactory;
//创建任务对象
@Bean
public Job helloWorldJob(){
return jobBuilderFactory.get("helloWorldJob") //job的名称 helloWorldJob任务的名称
.start(step1()) //这边执行一个任务
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("setp1") //step 的名称 step1
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello world");
return RepeatStatus.FINISHED; //正常结束
}
}).build();
}
}
2.2一次处理多个step
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing //执行批处理的注解
public class JobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory; //创建job对象的对象 这里的注解必须是要注入的
@Autowired
private StepBuilderFactory stepBuilderFactory;//创建step对象的对象
@Bean
//获取一个Job该job包含三个step
public Job jobDemoJob(){
return jobBuilderFactory.get("jobDemojob") //指定一个作业的名称
/* .start(step1()) //包含三个step 方法一
.next(step2())
.next(step3())
.build();*/
//方法二 当满足一定的条件的时候才会去做下个step on是条件to是到达谁
.start(step1())
.on("COMPLETED").to(step2())
.from(step2()).on("COMPLETED").to(step3())
.from(step3()).end()
.build();
}
//只用第一个step执行完了才能执行setp2
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tep1");
return RepeatStatus.FINISHED;
}
}).build();
}
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tep2");
return RepeatStatus.FINISHED;
}
}).build();
}
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tep3");
return RepeatStatus.FINISHED;
}
}).build();
}
}
2.3flow包含多个step
package com.jeremy.springbatch.config;
import javafx.beans.property.SetProperty;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class FlowDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step flowDemoSetp1(){
return stepBuilderFactory.get("flowDemoStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("flowDemoStep1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step flowDemoSetp2(){
return stepBuilderFactory.get("flowDemoStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("flowDemoStep2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step flowDemoSetp3(){
return stepBuilderFactory.get("flowDemoStep3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("flowDemoStep3");
return RepeatStatus.FINISHED;
}
}).build();
}
//创建Flow对象,指明Flow对象包含哪些step
@Bean
public Flow flowDemoFlow(){
return new FlowBuilder<Flow>("flowDemoFlow")
.start(flowDemoSetp1())
.next(flowDemoSetp2())
.build();
}
//创建Job对象
@Bean
public Job flowDemoJob(){
return jobBuilderFactory.get("flowDemoJob")
.start(flowDemoFlow())
.next(flowDemoSetp3())
.end()
.build();
}
}
2.4split同步执行flow(开启多线程)
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class SplitDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step splitDemo1(){
return stepBuilderFactory.get("splitDemo1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("splitDemo1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step splitDemo2(){
return stepBuilderFactory.get("splitDemo2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("splitDemo2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step splitDemo3(){
return stepBuilderFactory.get("splitDemo3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("splitDemo3");
return RepeatStatus.FINISHED;
}
}).build();
}
//创建flow
@Bean
public Flow splitDemoFlow1(){
return new FlowBuilder<Flow>("splitDemoFlow1")
.start(splitDemo1())
.build();
}
@Bean
public Flow splitDemoFlow2(){
return new FlowBuilder<Flow>("splitDemoFlow2")
.start(splitDemo2())
.next(splitDemo3())
.build();
}
//创建任务 两个flow并发执行
@Bean
public Job splitDemoJob(){
return jobBuilderFactory.get("splitDemoJob")
.start(splitDemoFlow1())
.split(new SimpleAsyncTaskExecutor()).add(splitDemoFlow2())
.end()
.build();
}
}
2.5决策器使用
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//决策器
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
//创建step
@Bean
public Step deciderDemoStep1(){
return stepBuilderFactory.get("deciderDemoStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("deciderDemoStep1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step deciderDemoStep2(){
return stepBuilderFactory.get("deciderDemoStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("even");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step deciderDemoStep3(){
return stepBuilderFactory.get("deciderDemoStep3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("odd");
return RepeatStatus.FINISHED;
}
}).build();
}
//创建决策器
@Bean
public JobExecutionDecider myDecider(){
return new MyDecider();
}
//创建 任务
@Bean
public Job deciderDemoJob(){
return jobBuilderFactory.get("deciderDemoJob")
.start(deciderDemoStep1())
.next(myDecider())
.from(myDecider()).on("even").to(deciderDemoStep2())
.from(myDecider()).on("odd").to(deciderDemoStep3())
.from(deciderDemoStep3()).on("*").to(myDecider()) //这个是执行玩stemp3之后无论你返回myDebider()这就到next那了,然后又变成偶数向下执行了
.end()
.build();
}
}
自己的决策器
package com.jeremy.springbatch.config;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
public class MyDecider implements JobExecutionDecider {
private int count;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count++;
if(count%2==0){
return new FlowExecutionStatus("even");
}else {
return new FlowExecutionStatus("odd");
}
}
}
2.6Job的嵌套
一个job可以嵌套再另一个job中,被嵌套的Job称为子Job,外部Job成为父Job,子Job不能单独执行,需要由父Job来启动
案例:创建两个job,作为子Job,再创建一个job作为父Job
两个子Job
第一个子Job
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableMBeanExport;
@Configuration
//@EnableBatchProcessing --这个注解就不写了放在启动类上了
public class ChildJob1 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step childJob1Step1(){
return stepBuilderFactory.get("childJob1Step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("childJob1Step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job childJobOne(){
return jobBuilderFactory.get("childJobOne")
.start(childJob1Step1())
.build();
}
}
第二个子Job
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
//@EnableBatchProcessing --这个注解就不写了放在启动类上了
public class ChildJob2 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step childJob2Step1(){
return stepBuilderFactory.get("childJob2Step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("childJob2Step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step childJob2Step2(){
return stepBuilderFactory.get("childJob2Step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("childJob2Step2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job childJobTow(){
return jobBuilderFactory.get("childJobTow")
.start(childJob2Step1())
.next(childJob2Step2())
.build();
}
}
父Job
package com.jeremy.springbatch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.job.JobStep;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class NestedDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Job childJobOne;
@Autowired
private Job childJobTow;
@Autowired
private JobLauncher jobLauncher;
@Bean
public Job parentJobs(JobRepository repository, PlatformTransactionManager transactionManager){
return jobBuilderFactory.get("parentJobs")
.start(childJob1(repository,transactionManager))
.next(childJob2(repository,transactionManager))
.build();
}
//返回的是Job类型的Step,特殊的step
private Step childJob2(JobRepository repository, PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJob2"))
.job(childJobTow)
.launcher(jobLauncher)//使用启动父Job的启动对象
.repository(repository)
.transactionManager(transactionManager)
.build();
}
private Step childJob1(JobRepository repository, PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJob1"))
.job(childJobOne)
.launcher(jobLauncher)//使用启动父Job的启动对象
.repository(repository)
.transactionManager(transactionManager)
.build();
}
}
因为只要启动父job不要启动子job所以再application.properties中配置
#配置启动Job的名称
spring.batch.job.names=parentJobs
2.7监听器的使用 这里是监听的jobi其他的step,trunk,read,process,write都可以监听
两种配置监听的方式一种是实现接口还有一种是使用注解
- 方式一接口
package com.jeremy.springbatch.listener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"before..");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"after..");
}
}
- 方法二 注解
package com.jeremy.springbatch.listener;
import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.BeforeChunk;
import org.springframework.batch.core.scope.context.ChunkContext;
public class MyChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"before");
}
@AfterChunk
public void afterChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"after");
}
}
- 使用监听
package com.jeremy.springbatch.listener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Configuration
public class ListenerDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job listenerJob(){
return jobBuilderFactory.get("listenerJob")
.start(step11())
.listener(new MyJobListener())
.build();
}
@Bean
public Step step11() {
return stepBuilderFactory.get("step11")
.<String,String>chunk(2) //每读两个进行处理 可以read ,process,write 可以指定泛型
.faultTolerant()
.listener(new MyChunkListener())
.reader(read())
.writer(writer())
.build();
}
@Bean
public ItemWriter<String> writer() {
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
for(String item:items){
System.out.println(item);
}
}
};
}
@Bean
public ItemReader<String> read() {
return new ListItemReader<>(Arrays.asList("java","spring","mybatis"));
}
}
2.8Job参数
2.9IteamReader简单实现
package com.jeremy.springbatch.itemreader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
@Configuration
public class ItemReaderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job itemReaderDemoJob(){
return jobBuilderFactory.get("itemReaderDemoJob")
.start(itemReaderDemoSteo())
.build();
}
@Bean
public Step itemReaderDemoSteo() {
return stepBuilderFactory.get("itemReaderDemoSteo")
.<String,String>chunk(2)//读完2个做后面的处理
.reader(itemReaderDemoRead())
.writer(list -> {
for(Object item:list){
System.out.println(item+"...");
}
}).build();
}
@Bean
public MyReader itemReaderDemoRead() {
List<String> data = Arrays.asList("cat","dog","pig","duck");
return new MyReader(data);
}
}
iteamReader的实现
package com.jeremy.springbatch.itemreader;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.util.Iterator;
import java.util.List;
public class MyReader implements ItemReader {
private Iterator<String> iterator;
public MyReader(List<String> list) {
this.iterator = list.iterator();
}
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//一个数据一个数据的读
if(iterator.hasNext()){
return iterator.next();
}else {
return null;
}
}
}
3.0ItemReader从数据库中读–重点
关注点一定要配置DataSource数据源,我这边再application配置了会自动帮我去管理
- 去数据库里帮我们读,其实我们一般的读都可以用这个方法
package com.jeremy.springbatch.iteamreaderdb;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class IteamReaderDBDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private DbJdbcWriter dbJdbcWriter;
@Bean
public Job iteamReaderDbJob(){
return jobBuilderFactory.get("iteamReaderDbJob")
.start(iteamReaderDbStep())
.build();
}
@Bean
public Step iteamReaderDbStep() {
return stepBuilderFactory.get("iteamReaderDbStep")
.<User,User>chunk(2)
.reader(dbJdbcReader())
.writer(dbJdbcWriter)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<User> dbJdbcReader() { //JdbcPagingItemReader是从数据库中筛选
JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<User>();
reader.setDataSource(dataSource);
reader.setFetchSize(2);//每次读两条
//把读取的记录转换成User对象
reader.setRowMapper(new RowMapper<User>() {
@Override
//i表示有多少行
public User mapRow(ResultSet resultSet, int i) throws SQLException {
User user = new User();
user.setId(resultSet.getInt(1));
user.setUsername(resultSet.getString(2));
user.setPassword(resultSet.getString(3));
user.setAge(resultSet.getInt(4));
return user;
}
});
//指定sql语句
MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
provider.setSelectClause("id,username,password,age"); //查什么字段
provider.setFromClause("from user");//从哪个表
//指定根据那个字段排序
Map<String, Order> sort = new HashMap<>(1);
sort.put("id",Order.ASCENDING);
provider.setSortKeys(sort);
reader.setQueryProvider(provider);
return reader;
}
}
写读取出来的数据
package com.jeremy.springbatch.iteamreaderdb;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class DbJdbcWriter implements ItemWriter<User> {
@Override
public void write(List<? extends User> items) throws Exception {
for(User user:items){
System.out.println(user);
}
}
}
3.1 ItemReader从普通文件读(txt)–重点
- writer方法我就不写了大同小异还有一个Customer对象
package com.jeremy.springbatch.itemReaderFile;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.PathResource;
import org.springframework.validation.BindException;
@Configuration
public class FileItemReaderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private FlatFileWriter flatFileWriter;
@Bean
public Job fileItemReaderDemoJob(){
return jobBuilderFactory.get("fileItemReaderDemoJob")
.start(fileItemReaderDemoStep())
.build();
}
@Bean
public Step fileItemReaderDemoStep() {
return stepBuilderFactory.get("fileItemReaderDemoStep")
.<Customer,Customer>chunk(3)
.reader(fileRader())
.writer(flatFileWriter)
.build();
}
@Bean
@StepScope//范围限于step
public FlatFileItemReader<Customer> fileRader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>();
reader.setResource(new PathResource("customer.txt")); //原先是ClassPathResource是会报错的
reader.setLinesToSkip(1);//跳过第一行 因为第一行可能是表头
//解析数据
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
//把解析出的一行数据映射为Customer对象
DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer = new Customer();
customer.setId(fieldSet.readLong("id"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setBirthday(fieldSet.readString("birthday"));
return customer;
}
});
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
}
写入数据库还不是(待jiu)
package com.jeremy.springbatch.itemWriterFile;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class ItemWriterDbconfig {
@Autowired
private DataSource dataSource;
@Bean
public JdbcBatchItemWriter<Customer> itemWriterDb(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<Customer>();
writer.setDataSource(dataSource);
writer.setSql("insert into customer(id,firstName,lastName,birthday)values"+
"(:id,:firstName,:lastName,:birthday)"); //使用占位符防止依赖注入
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Customer>());
return writer;
}
}
后面的就不写了参考视频
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/140226.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...