batch spring 重复执行_Spring Batch批处理

batch spring 重复执行_Spring Batch批处理SpringBatch批处理批处理顾名思义是批量处理大量数据,但是这个大量数据又不是特别大的大数据,比Hadoop等要轻量得多,适合企业单位人数薪资计算,财务系统月底一次性结算等常规数据批量处理。SpringBatch是一个用于创建健壮的批处理应用程序的完整框架。您可以创建可重用的函数来处理大量数据或任务,通常称为批量处理。如SpringBatch文档中所述,使用该框架的最常见方案如下:•定…

大家好,又见面了,我是你们的朋友全栈君。

Spring Batch批处理

批处理顾名思义是批量处理大量数据,但是这个大量数据又不是特别大的大数据,比Hadoop等要轻量得多,适合企业单位人数薪资计算,财务系统月底一次性结算等常规数据批量处理。

Spring Batch是一个用于创建健壮的批处理应用程序的完整框架。您可以创建可重用的函数来处理大量数据或任务,通常称为批量处理。

如Spring Batch文档中所述,使用该框架的最常见方案如下:

•定期提交批处理

•并行处理作业的并发批处理

•分阶段,企业消息驱动处理

•大型并行批处理

•手动或故障后的计划重新启动

•依赖步骤的顺序处理(扩展到工作流程驱动的批处理)

•部分处理:跳过记录(例如,回滚时)

•整批事务:对于批量小或现有存储过程的情况/脚本

Spring Batch的特点有:

事务管理,让您专注于业务处理,实现批处理机制,你可以引入平台事务机制或其他事务管理器机制

基于块Chunk的处理,通过将一大段大量数据分成一段段小数据来处理,。

启动/停止/重新启动/跳过/重试功能,以处理过程的非交互式管理。

基于Web的管理界面(Spring Batch Admin),它提供了一个用于管理任务的API。

基于Spring框架,因此它包括所有配置选项,包括依赖注入。

符合JSR 352:Java平台的批处理应用程序。

基于数据库管理的批处理,可与Spring Cloud Task结合,适合分布式集群下处理。

能够进行多线程并行处理,分布式系统下并行处理,变成一种弹性Job分布式处理框架。

Spring批处理的基本单元是Job,你需要定义一个Job代表一次批处理工作,每个Job分很多步骤step,每个步骤里面有两种处理方式Tasklet(可重复执行的小任务)和Chunk(块),掌握Spring Batch主要是将这几个核心概念搞清楚。

batch spring 重复执行_Spring Batch批处理

在SpringBoot架构下,我们只要做一个JobConfig组件作为JobLauncher,使用@Configuration配置,然后完成上图中Job和Step以及ItemReader,ItemProcessor和ItemWriter,后面这三个分别是存在一个步骤里,用于处理条目的输入读 、处理然后输出写出。至于图中JobRepository只要我们在Application.properties中配置上datasource,SpringBoot启动时会自动将batch需要的库表导入到数据库中。

下图是每个步骤内部的事务处理过程,在进行读入 处理和写出时,如果有任何一个步骤出错,将会事务回滚,也只有这三个步骤全部完成,才能提交事务机制,进而完成一个步骤。

batch spring 重复执行_Spring Batch批处理

下面我们看一个简单案例如何使用SpringBatch的,这个案例功能是从一个CSV文件中导入数据到数据库中。

首先导入pom.xml:

org.springframework.boot

spring-boot-starter-batch

mysql

mysql-connector-java

runtime

这里使用MysSQL作为Job仓库,在Application.properties配置:

spring.batch.initialize-schema=always

spring.datasource.url=jdbc:mysql://localhost:3306/mytest

spring.datasource.username=banq

spring.datasource.password=XXX

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

配置了spring.batch.initialize-schema为always这样能自动启动时导入批处理需要的数据库表。

下面我们实现批处理的关键类@Configuration:

首先定义一个Job:

@Bean

public Job importUserJob() {

return jobBuilderFactory.get(“importUserJob”)

.incrementer(new RunIdIncrementer())

.flow(step1())

.end()

.build();

}

这个Job名称是importUserJob,其中使用了步骤step1:

@Bean

public Step step1() {

return stepBuilderFactory.get(“step1”). chunk(3)

.reader(reader())

.processor(processor())

.writer(writer())

.build();

}

这个步骤step1中使用了chunk,分块读取数据处理后输出。下面是依次看看输入 处理和输出的方法:

@Bean

public FlatFileItemReader reader(){

FlatFileItemReader reader = new FlatFileItemReader();

reader.setResource(new ClassPathResource(“users.csv”));

reader.setLineMapper(new DefaultLineMapper() {
{

setLineTokenizer(new DelimitedLineTokenizer() {
{

setNames(new String[] { “name” });

}});

setFieldSetMapper(new BeanWrapperFieldSetMapper() {
{

setTargetType(User.class);

}});

}});

return reader;

}

这是输入,读取classpath下的uers.csv文件:

testdata1

testdata2

testdata3

一次读入三行,提取一行中数据作为User这个对象的name输入其中:

@Entity

public class User {

@Id

@GeneratedValue(strategy = GenerationType.IDENTITY)

private int id ;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

private String name;

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

}

User是我们的一个实体数据,其中ID使用数据库自增,name由user.csv导入,User对应的数据表schema.sql是:

CREATE TABLE `user` (

`id` int(11) NOT NULL auto_increment,

`name` varchar(45) NOT NULL default ”,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

我们只要在pom.xml中导入JPA包:

org.springframework.boot

spring-boot-starter-data-jpa

并在application.properties中加入,就可以在SpringBoot启动时,自动使用datasource配置的数据库建立User表了。

spring.jpa.generate-ddl=true

下面我们回到批处理,前面定义了输入,下面依次是条目处理:

public class UserItemProcessor implements ItemProcessor {

@Override

public User process(User user) throws Exception {

return user;

}

}

这个条目处理就是对每个User对象进行处理,这时User对象已经包含了从CSV读取的数据,如果希望再进行加工处理就在这里进行。

下面是条目输出:

@Bean

public JdbcBatchItemWriter writer(){

JdbcBatchItemWriter writer = new JdbcBatchItemWriter();

writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());

writer.setSql(“INSERT INTO user(name) VALUES (:name)”);

writer.setDataSource(dataSource);

return writer;

}

每一行数据我们从CSV读出以后放入到User中,然后再插入数据表user保存。

至此,我们简单完成了一个批处理开发过程,具体代码见 Github

下面我们会展示更多Springbatch特性:

Spring批处理远程分块 实现主从计算的分布式批处理架构

Spring批处理分区 对数据进行分片sharding后分区用多线程或分布式系统处理

其他专题

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/140434.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 分布式事务 java代码_Java分布式事务概念与实现示例[通俗易懂]

    分布式事务 java代码_Java分布式事务概念与实现示例[通俗易懂]在java中有如下三种事务,简单的JDBC级的事务JTA-在EJB环境下,用户得到事务并进行控制CMP-完全由容器控制事务,用户通过Bean配置文件来定义事务行为二三种都支持分布式事务,但只支持Java环境下的分布式事务。下面讨论如何在Java程序里实现分布式事务,即在同一个事务里访问多个数据源。实际上就是如何使用JTA.这里假设使用oracle数据库,使用webLogic部署应用,所要做…

  • PHP解决高并发问题

    PHP解决高并发问题我们通常衡量一个Web系统的吞吐率的指标是QPS(QueryPerSecond,每秒处理请求数),解决每秒数万次的高并发场景,这个指标非常关键。举个例子,我们假设处理一个业务请求平均响应时间为100ms,同时,系统内有20台Apache的Web服务器,配置MaxClients为500个(表示Apache的最大连接数目)。那么,我们的Web系统的理论峰值QPS为(理想化的计算方式):20*500/0.1=100000(10万QPS)咦?我们的系统似乎很强大,1秒钟可以处理完10万的请求,5w/

  • mysql blob数据类型_MySQL中三种锁的特点

    mysql blob数据类型_MySQL中三种锁的特点先说明一下Blob的类型,直接从网上摘抄了!!!1、MySQL有四种BLOB类型:  ·tinyblob:仅255个字符  ·blob:最大限制到65K字节  ·mediumblob:限制到16M字节  ·longblob:可达4GB2、除了类型对后面存取文件大小有限制,还要修改mysql的配置文件。  Windows、linux基本一样通过修改文件my.ini或my.cnf文件,在…

    2022年10月23日
  • 【STM32】HAL库 STM32CubeMX教程六—-定时器中断

    【STM32】HAL库 STM32CubeMX教程六—-定时器中断前言:今天我们来学习定时器,32的定时器有着非常丰富的功能,输入捕获/输出比较,PWM,中断等等。是我们学习STM32最频繁使用到的外设之一,所以一定要掌握好,这节我们讲解定时器中断,本系列教程将对应外设原理,HAL库与STM32CubeMX结合在一起讲解,使您可以更快速的学会各个模块的使用所用工具:1、芯片:STM32F407ZET6/STM32F103ZET62、ST…

  • java collections.sort_java中

    java collections.sort_java中importjava.awt.BorderLayout;importjava.awt.GridLayout;importjava.awt.image.BufferedImage;importjavax.swing.ImageIcon;importjavax.swing.JButton;importjavax.swing.JFrame;importjavax.swing.JLabel;…

  • 一篇文章彻底搞懂浅拷贝和深拷贝的区别_深拷贝和浅拷贝的题

    一篇文章彻底搞懂浅拷贝和深拷贝的区别_深拷贝和浅拷贝的题强烈推荐30个原生JavaScript的demo,包括canvas时钟特效、自定义视频播放器、搜索栏快速匹配、fetch访问资源、console调试技巧等,先fork后学习,详见点击打开链接,欢迎点赞~~~谢谢,共同进步学习!【javascript】详解javaScript的深拷贝目录浅谈深拷贝和浅拷贝 深拷贝和浅拷贝的区别 为什么要使用深拷贝? 深拷贝的要求程度…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号