springbatch+mysql

springbatch+mysql整体的框架1.搭建springbatch的框架1.必须导入的依赖。因为spirngbatch必须配置数据源dataSource所有引入了数据库的相关jar包<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starte…

大家好,又见面了,我是你们的朋友全栈君。

参考视频 https://www.iqiyi.com/v_19rr1myxr8.html

重点 看3.0从数据库中读

整体的框架

在这里插入图片描述

注意:

  1. 再批处理数据库的时候一定要注意大字段CLOB的问题和时间的问题。
  • 如果遇到大字段的时候需要把获取的大字段转为String再存入数据库下面给出实例代码。
// Clob类型 转String
    public static String ClobToString(Clob clob) throws SQLException, IOException {
        String reString = "";
        Reader is = clob.getCharacterStream();// 得到流
        BufferedReader br = new BufferedReader(is);
        String s = br.readLine();
        StringBuffer sb = new StringBuffer();
        while (s != null) {// 执行循环将字符串全部取出付值给StringBuffer由StringBuffer转成STRING
            sb.append(s);
            s = br.readLine();
        }
        reString = sb.toString();
        if(br!=null){
            br.close();
        }
        if(is!=null){
            is.close();
        }
        return reString;
    }

1.搭建springbatch的框架

1.必须导入的依赖。因为spirngbatch必须配置数据源dataSource所有引入了数据库的相关jar包

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!--必须配置数据源-->
 <dependency>
     <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.11</version>
  </dependency>

2.相关的配置文件application.properties。最后两行很重要因为spirngbatch会依赖一些表,最后一行的配置是执行倒数第二行的sql语句。当你再执行启动类的时候就会再数据库中默认创建哪些必须的表。

spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/springbatch?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
#下免的配置是再初始化的时候会执行上面的schema-mysql.sql中的表
spring.batch.initialize-schema=always

2.最简单的例子,再加载启动类的时候会加载他

2.1最简单的一个step

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class JobConfiguration {

    //注入创建任务对象的对象
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    //任务的执行由setp决定
    //注入创建setp对象的对象
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    //创建任务对象
    @Bean
    public Job helloWorldJob(){
       return jobBuilderFactory.get("helloWorldJob") //job的名称 helloWorldJob任务的名称
               .start(step1())  //这边执行一个任务
               .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("setp1") //step 的名称 step1
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("hello world");
                        return RepeatStatus.FINISHED; //正常结束
                    }
                }).build();
    }
}

2.2一次处理多个step

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing //执行批处理的注解
public class JobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory; //创建job对象的对象 这里的注解必须是要注入的
    @Autowired
    private StepBuilderFactory stepBuilderFactory;//创建step对象的对象

    @Bean
    //获取一个Job该job包含三个step
    public Job jobDemoJob(){
        return jobBuilderFactory.get("jobDemojob") //指定一个作业的名称
               /* .start(step1())  //包含三个step 方法一
                .next(step2())
                .next(step3())
                .build();*/
               //方法二  当满足一定的条件的时候才会去做下个step on是条件to是到达谁
                .start(step1())
                .on("COMPLETED").to(step2())
                .from(step2()).on("COMPLETED").to(step3())
                .from(step3()).end()
                .build();
    }

    //只用第一个step执行完了才能执行setp2
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("tep1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("tep2");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    public Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("tep3");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
}

2.3flow包含多个step

package com.jeremy.springbatch.config;

import javafx.beans.property.SetProperty;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class FlowDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step flowDemoSetp1(){
        return stepBuilderFactory.get("flowDemoStep1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("flowDemoStep1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Step flowDemoSetp2(){
        return stepBuilderFactory.get("flowDemoStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("flowDemoStep2");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step flowDemoSetp3(){
        return stepBuilderFactory.get("flowDemoStep3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("flowDemoStep3");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    //创建Flow对象,指明Flow对象包含哪些step
    @Bean
    public Flow flowDemoFlow(){
        return new FlowBuilder<Flow>("flowDemoFlow")
                .start(flowDemoSetp1())
                .next(flowDemoSetp2())
                .build();
    }
    //创建Job对象
    @Bean
    public Job flowDemoJob(){
        return jobBuilderFactory.get("flowDemoJob")
                .start(flowDemoFlow())
                .next(flowDemoSetp3())
                .end()
                .build();
    }
}

2.4split同步执行flow(开启多线程)

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class SplitDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step splitDemo1(){
        return stepBuilderFactory.get("splitDemo1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("splitDemo1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step splitDemo2(){
        return stepBuilderFactory.get("splitDemo2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("splitDemo2");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step splitDemo3(){
        return stepBuilderFactory.get("splitDemo3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("splitDemo3");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    //创建flow
    @Bean
    public Flow splitDemoFlow1(){
        return new FlowBuilder<Flow>("splitDemoFlow1")
                .start(splitDemo1())
                .build();
    }
    @Bean
    public Flow splitDemoFlow2(){
        return new FlowBuilder<Flow>("splitDemoFlow2")
                .start(splitDemo2())
                .next(splitDemo3())
                .build();
    }

    //创建任务 两个flow并发执行
    @Bean
    public Job splitDemoJob(){
        return jobBuilderFactory.get("splitDemoJob")
                .start(splitDemoFlow1())
                .split(new SimpleAsyncTaskExecutor()).add(splitDemoFlow2())
                .end()
                .build();
    }
}

2.5决策器使用

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//决策器
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    //创建step
    @Bean
    public Step deciderDemoStep1(){
        return stepBuilderFactory.get("deciderDemoStep1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("deciderDemoStep1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step deciderDemoStep2(){
        return stepBuilderFactory.get("deciderDemoStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("even");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step deciderDemoStep3(){
        return stepBuilderFactory.get("deciderDemoStep3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("odd");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    //创建决策器
    @Bean
    public JobExecutionDecider myDecider(){
        return new MyDecider();
    }
    //创建 任务
    @Bean
    public Job deciderDemoJob(){
        return  jobBuilderFactory.get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2())
                .from(myDecider()).on("odd").to(deciderDemoStep3())
                .from(deciderDemoStep3()).on("*").to(myDecider()) //这个是执行玩stemp3之后无论你返回myDebider()这就到next那了,然后又变成偶数向下执行了
                .end()
                .build();
    }
}

自己的决策器

package com.jeremy.springbatch.config;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

public class MyDecider implements JobExecutionDecider {
    private int count;
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        count++;
        if(count%2==0){
            return new FlowExecutionStatus("even");
        }else {
            return new FlowExecutionStatus("odd");
        }
    }
}

2.6Job的嵌套

一个job可以嵌套再另一个job中,被嵌套的Job称为子Job,外部Job成为父Job,子Job不能单独执行,需要由父Job来启动
案例:创建两个job,作为子Job,再创建一个job作为父Job
两个子Job

第一个子Job

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableMBeanExport;

@Configuration
//@EnableBatchProcessing --这个注解就不写了放在启动类上了
public class ChildJob1 {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step childJob1Step1(){
        return stepBuilderFactory.get("childJob1Step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("childJob1Step1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Job childJobOne(){
        return jobBuilderFactory.get("childJobOne")
                .start(childJob1Step1())
                .build();
    }

}



第二个子Job

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
//@EnableBatchProcessing --这个注解就不写了放在启动类上了
public class ChildJob2 {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step childJob2Step1(){
        return stepBuilderFactory.get("childJob2Step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("childJob2Step1");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Step childJob2Step2(){
        return stepBuilderFactory.get("childJob2Step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("childJob2Step2");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Job childJobTow(){
        return jobBuilderFactory.get("childJobTow")
                .start(childJob2Step1())
                .next(childJob2Step2())
                .build();
    }

}


父Job

package com.jeremy.springbatch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.job.JobStep;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class NestedDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private Job childJobOne;
    @Autowired
    private Job childJobTow;
    @Autowired
    private JobLauncher jobLauncher;
    @Bean
    public Job parentJobs(JobRepository repository, PlatformTransactionManager transactionManager){
        return jobBuilderFactory.get("parentJobs")
                .start(childJob1(repository,transactionManager))
                .next(childJob2(repository,transactionManager))
                .build();
    }

    //返回的是Job类型的Step,特殊的step
    private Step childJob2(JobRepository repository, PlatformTransactionManager transactionManager) {
        return new JobStepBuilder(new StepBuilder("childJob2"))
                .job(childJobTow)
                .launcher(jobLauncher)//使用启动父Job的启动对象
                .repository(repository)
                .transactionManager(transactionManager)
                .build();
    }

    private Step childJob1(JobRepository repository, PlatformTransactionManager transactionManager) {
        return new JobStepBuilder(new StepBuilder("childJob1"))
                .job(childJobOne)
                .launcher(jobLauncher)//使用启动父Job的启动对象
                .repository(repository)
                .transactionManager(transactionManager)
                .build();
    }
}

因为只要启动父job不要启动子job所以再application.properties中配置
#配置启动Job的名称
spring.batch.job.names=parentJobs

2.7监听器的使用 这里是监听的jobi其他的step,trunk,read,process,write都可以监听

两种配置监听的方式一种是实现接口还有一种是使用注解

  • 方式一接口
package com.jeremy.springbatch.listener;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class MyJobListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName()+"before..");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName()+"after..");
    }
}

  • 方法二 注解
package com.jeremy.springbatch.listener;

import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.BeforeChunk;
import org.springframework.batch.core.scope.context.ChunkContext;

public class MyChunkListener {

    @BeforeChunk
    public void beforeChunk(ChunkContext context){
        System.out.println(context.getStepContext().getStepName()+"before");
    }
    @AfterChunk
    public void afterChunk(ChunkContext context){
        System.out.println(context.getStepContext().getStepName()+"after");
    }
}
  • 使用监听
package com.jeremy.springbatch.listener;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@Configuration
public class ListenerDemo {
   @Autowired
   private JobBuilderFactory jobBuilderFactory;
   @Autowired
   private StepBuilderFactory stepBuilderFactory;

   @Bean
   public Job listenerJob(){
       return jobBuilderFactory.get("listenerJob")
               .start(step11())
               .listener(new MyJobListener())
               .build();
   }

   @Bean
   public Step step11() {
       return stepBuilderFactory.get("step11")
               .<String,String>chunk(2) //每读两个进行处理 可以read ,process,write 可以指定泛型
               .faultTolerant()
               .listener(new MyChunkListener())
               .reader(read())
               .writer(writer())
               .build();
   }

   @Bean
   public ItemWriter<String> writer() {
       return new ItemWriter<String>() {
           @Override
           public void write(List<? extends String> items) throws Exception {
               for(String item:items){
                   System.out.println(item);
               }
           }
       };
   }


   @Bean
   public ItemReader<String> read() {
       return new ListItemReader<>(Arrays.asList("java","spring","mybatis"));
   }
}

2.8Job参数

2.9IteamReader简单实现

package com.jeremy.springbatch.itemreader;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

@Configuration
public class ItemReaderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job itemReaderDemoJob(){
        return jobBuilderFactory.get("itemReaderDemoJob")
                .start(itemReaderDemoSteo())
                .build();
    }

    @Bean
    public Step itemReaderDemoSteo() {
        return stepBuilderFactory.get("itemReaderDemoSteo")
                .<String,String>chunk(2)//读完2个做后面的处理
                .reader(itemReaderDemoRead())
                .writer(list -> {
                    for(Object item:list){
                        System.out.println(item+"...");
                    }
                }).build();
    }

    @Bean
    public MyReader itemReaderDemoRead() {
        List<String> data = Arrays.asList("cat","dog","pig","duck");
        return new MyReader(data);
    }
}

iteamReader的实现

package com.jeremy.springbatch.itemreader;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.util.Iterator;
import java.util.List;

public class MyReader implements ItemReader {
    private Iterator<String> iterator;
    public MyReader(List<String> list) {
        this.iterator = list.iterator();
    }

    @Override
    public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        //一个数据一个数据的读
        if(iterator.hasNext()){
            return iterator.next();
        }else {
            return null;
        }
    }
}

3.0ItemReader从数据库中读–重点

关注点一定要配置DataSource数据源,我这边再application配置了会自动帮我去管理

  • 去数据库里帮我们读,其实我们一般的读都可以用这个方法
package com.jeremy.springbatch.iteamreaderdb;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class IteamReaderDBDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private DbJdbcWriter dbJdbcWriter;

    @Bean
    public Job iteamReaderDbJob(){
        return jobBuilderFactory.get("iteamReaderDbJob")
                .start(iteamReaderDbStep())
                .build();

    }

    @Bean
    public Step iteamReaderDbStep() {
        return stepBuilderFactory.get("iteamReaderDbStep")
                .<User,User>chunk(2)
                .reader(dbJdbcReader())
                .writer(dbJdbcWriter)
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<User> dbJdbcReader() { //JdbcPagingItemReader是从数据库中筛选
        JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<User>();
        reader.setDataSource(dataSource);
        reader.setFetchSize(2);//每次读两条
        //把读取的记录转换成User对象
        reader.setRowMapper(new RowMapper<User>() {
            @Override
            //i表示有多少行
            public User mapRow(ResultSet resultSet, int i) throws SQLException {
                User user = new User();
                user.setId(resultSet.getInt(1));
                user.setUsername(resultSet.getString(2));
                user.setPassword(resultSet.getString(3));
                user.setAge(resultSet.getInt(4));
                return user;
            }
        });
        //指定sql语句
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("id,username,password,age"); //查什么字段
        provider.setFromClause("from user");//从哪个表
        //指定根据那个字段排序
        Map<String, Order> sort = new HashMap<>(1);
        sort.put("id",Order.ASCENDING);
        provider.setSortKeys(sort);

        reader.setQueryProvider(provider);
        return reader;
    }

}

写读取出来的数据

package com.jeremy.springbatch.iteamreaderdb;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class DbJdbcWriter implements ItemWriter<User> {
    @Override
    public void write(List<? extends User> items) throws Exception {
        for(User user:items){
            System.out.println(user);
        }
    }
}

3.1 ItemReader从普通文件读(txt)–重点

  • writer方法我就不写了大同小异还有一个Customer对象
package com.jeremy.springbatch.itemReaderFile;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.PathResource;
import org.springframework.validation.BindException;

@Configuration
public class FileItemReaderDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private FlatFileWriter flatFileWriter;
    @Bean
    public Job fileItemReaderDemoJob(){
        return jobBuilderFactory.get("fileItemReaderDemoJob")
                .start(fileItemReaderDemoStep())
                .build();
    }

    @Bean
    public Step fileItemReaderDemoStep() {
        return stepBuilderFactory.get("fileItemReaderDemoStep")
                .<Customer,Customer>chunk(3)
                .reader(fileRader())
                .writer(flatFileWriter)
                .build();
    }

    @Bean
    @StepScope//范围限于step
    public FlatFileItemReader<Customer> fileRader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>();
        reader.setResource(new PathResource("customer.txt")); //原先是ClassPathResource是会报错的
        reader.setLinesToSkip(1);//跳过第一行 因为第一行可能是表头
        //解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthday"});
        //把解析出的一行数据映射为Customer对象
        DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
        mapper.setLineTokenizer(tokenizer);
        mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
            @Override
            public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
                Customer customer = new Customer();
                customer.setId(fieldSet.readLong("id"));
                customer.setFirstName(fieldSet.readString("firstName"));
                customer.setLastName(fieldSet.readString("lastName"));
                customer.setBirthday(fieldSet.readString("birthday"));
                return customer;
            }
        });
        mapper.afterPropertiesSet();
        reader.setLineMapper(mapper);
        return reader;
    }
}


写入数据库还不是(待jiu)

package com.jeremy.springbatch.itemWriterFile;

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class ItemWriterDbconfig  {

    @Autowired
    private DataSource dataSource;
    @Bean
    public JdbcBatchItemWriter<Customer> itemWriterDb(){
       JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<Customer>();
       writer.setDataSource(dataSource);
       writer.setSql("insert into customer(id,firstName,lastName,birthday)values"+
               "(:id,:firstName,:lastName,:birthday)"); //使用占位符防止依赖注入
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Customer>());
        return writer;
    }
}

后面的就不写了参考视频

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/140226.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • java 时间轮算法_时间轮算法(TimingWheel)是如何实现的?

    java 时间轮算法_时间轮算法(TimingWheel)是如何实现的?前言我在2.SOFAJRaft源码分析—JRaft的定时任务调度器是怎么做的?这篇文章里已经讲解过时间轮算法在JRaft中是怎么应用的,但是我感觉我并没有讲解清楚这个东西,导致看了这篇文章依然和没看是一样的,所以我打算重新说透时间轮算法。时间轮的应用并非JRaft独有,其应用场景还有很多,在Netty、Akka、Quartz、ZooKeeper、Kafka等组件中都存在时间轮的踪影。我们…

  • 信道带宽与信号带宽「建议收藏」

    信道带宽与信号带宽「建议收藏」信号带宽是信号频谱的宽度,也就是信号的最高频率分量与最低频率分量之差,譬如,一个由数个正弦波叠加成的方波信号,其最低频率分量是其基频,假定为f=2kHz,其最高频率分量是其7次谐波频率,即7f=7×2=14kHz,因此该信号带宽为7f-f=14-2=12kHz。信道带宽则限定了允许通过该信道的信号下限频率和上限频率,也就是限定了一个频率通带。比如一个信道允许的通带为1.5kHz至15k…

    2022年10月11日
  • hdfs读写文件过程

    hdfs读写文件过程hdfs读写文件过程

  • 墙裂推荐9个在线图片压缩网站[通俗易懂]

    墙裂推荐9个在线图片压缩网站[通俗易懂]转载自:https://www.zcool.com.cn/article/ZNTQzNDYw.html?switchPage=on1.Tinypng网址:https://tinypng.com/Tinypng可以说是很受大家欢迎的一个图片压缩站点,不管对于前端工程师或者设计师来说都是一个不错的图片压缩工具。Tinypng的操作方式也十分的简单,上传、压缩、下载,流程十分的简单,网站不仅仅支…

  • LINUX版navicat15永久激活码(注册激活)

    (LINUX版navicat15永久激活码)2021最新分享一个能用的的激活码出来,希望能帮到需要激活的朋友。目前这个是能用的,但是用的人多了之后也会失效,会不定时更新的,大家持续关注此网站~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.html…

  • 万洲金业平台上炒黄金亏损了怎么办?「建议收藏」

    万洲金业平台上炒黄金亏损了怎么办?「建议收藏」  由于受国际行情变化影响,黄金市场很难长时间维持单边走势,因此金价起伏波动不断才是正确的打开方式。尽管黄金价格不断变化为人们营造了良好的盈利空间,但对于大多数人来说,尽管亏损是难以避免的,但真当风险来临,还是难以接受。所以今天就详细介绍一下当人们在万洲金业平台上发生了炒金亏损之后应该怎么办。万洲金业是一家专业的黄金交易平台,为人们提供了极为周到的黄金投资服务,也借助良好的市场表现成为了不少人的炒金选择。即便如此也不能代表平台客户不会发生黄金投资亏损。  在万洲金业平台上炒黄金,一旦发生了交易亏损,

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号