大家好,又见面了,我是你们的朋友全栈君。
springbatch 批处理框架整理
springbatch 批处理框架整理
(还在整理中。。。。。。。。有点乱,待更新)
Spring Batch 是什么? 官网中介绍 Spring Batch is a lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)相对于他的特点定义我们肯定更倾向于他的使用的业务场景以及他是如何运作的。下面的篇幅将介绍整个springbatch的使用业务场景和它的结构原理以及如何去使用它们(最后会通过一个demo来演示)。
springbatch结合springboot 的demo:https://github.com/kellypipe/springbatch-springboot-demo
1、使用场景
对于没有相关经验的初学者,下面是需要批处理的一些场景,并且如果使用Spring Batch 很可能会节省你很多宝贵的时间:
- 接收的文件缺少了一部分需要的信息,你需要读取并解析整个文件,调用某个服务来获得缺少的那部分信息,然后写入到某个输出文件,供其他批处理程序使用。
- 如果执行环境中发生了一个错误,则将失败信息写入数据库。 有专门的程序每隔15分钟来遍历一次失败信息,如果标记为可以重试,那就再执行一次。
- 在工作流中,你希望其他系统在收到事件消息时,来调用某个特定服务。 如果其他系统没有调用这个服务,那么一段时间后需要自动清理过期数据,以避免影响到正常的业务流程。
- 每天收到员工信息更新的文件,你需要为新员工建立相关档案和账号(artifacts)。
- 有些定制订单的服务。 你需要在每天晚上执行批处理程序来生成清单文件,并将它们发送到相应的供应商手上。
典型的批处理程序通常是从数据库、文件或队列中读取大量数据,然后通过某些方法处理数据,最后将处理好格式的数据写回库。对于批处理经验少的开发者来说,编写批处理程序来处理GB级别数据量无疑是种海啸般难以面对的任务,但我们可以用Spring Batch将其拆解为小块小块的(chunk)。 Spring Batch 是Spring框架的一个模块,专门设计来对各种类型的文件进行批量处理。
- 定期提交批处理任务
- 并发批处理:并行执行任务
- 分阶段,企业消息驱动处理
- 高并发批处理任务
- 失败后手动或定时重启
- 按顺序处理任务依赖(使用工作流驱动的批处理插件)
- 局部处理:跳过记录(例如在回滚时)
- 完整的批处理事务:因为可能有小数据量的批处理或存在存储过程/脚本
总的来说,springbatch 封装了一些细节操作(比如批处理数据的时候不需要我们自己去考虑如何去读取数据,如何去操作数据,如何去写入数据,这些框架都封装了),我们需要关注整个批处理任务的流程就可以了;
2、框架结构
上面图是已经使用了几十年的批处理参考体系结构的简化版本。它概述了组成批处理领域的组件.Spring Batch 在系统中提供了健壮的、可维护的常见的层、组件和技术服务的物理实现,这些系统用于创建简单到复杂的批处理应用程序,其基础结构和扩展可以满足非常复杂的处理需求。
上图能明显看到有4个主要角色:
- JobLauncher:是任务启动器,通过它来启动任务,可以看做是程序的入口。
- Job代表着一个具体的任务。
- Step代表着一个具体的步骤,一个Job可以包含多个Step.在实际业务场景中,可能一个任务很复杂,这个时候可以将任务 拆分成多个step,分别对这些step 进行管理(将一个复杂任务简单化)。(这些step 默认是串行执行,也可以并行执行,根据具体的业务场景来使用)。每一个都有一个ItemReader(读取数据),一个ItemProcessor(处理数据)和一个ItemWriter(写入数据)
- JobRepository:批处理框架执行过程中的上下文(元数据)–这个有两种实现一种是通过内存来管理,一种是进行持久化到数据库。
2.1 JobLauncher
JobLauncher是任务启动器,该接口只有一个run方法
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
除了传入Job对象之外,还需要传入JobParameters对象,后续讲到Job再解释为什么要多传一个JobParameters。通过JobLauncher可以在Java程序中调用批处理任务,也可以通过命令行或者其他框架(如定时调度框架Quartz、Web后台框架Spring MVC)中调用批处理任务。Spring Batch框架提供了一个JobLauncher的实现类SimpleJobLauncher。
2.2、Job
在Spring批处理中,作业只是步骤实例的容器。它将逻辑上属于流中的多个步骤组合在一起,并允许对所有步骤进行属性全局配置。
- 简单的工作名称。
- 步骤实例的定义和排序。
- 工作是否可以重新开始
考虑到任务可能不是只执行一次就再也不执行了,更多的情况可能是定时任务,如每天执行一次,每个星期执行一次等等,那么为了区分每次执行的任务,框架使用了JobInstance。如上图所示,Job是一个EndOfDay(每天最后时刻执行的任务),那么其中一个JobInstance就代表着2007年5月5日那天执行的任务实例。框架通过在执行JobLauncher.run(Job, JobParameters)方法时传入的JobParameters来区分是哪一天的任务。
由于2007年5月5日那天执行的任务可能不会一次就执行完成,比如中途被停止,或者出现异常导致中断,需要多执行几次才能完成,所以框架使用了JobExecution来表示每次执行的任务。
SimpleJob 是Spring Batch默认简单实现
类,它在Job之上创建一些标准功能。在使用基于java的配置时,可以使用一组构建器来实例化作业,如下面的示例所示。
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
2.2.1、JobInstance
JobInstance指的是 逻辑作业运行单元的概念。考虑一个批作业,可能不仅仅执行一次。例如前面图中的“EndOfDay”作业,批作业在每一天结束时运行一次。所以对每个作业的运行必须单独逻辑的JobInstance跟踪。比如1月1日的跑步,1月2日的跑步,等等。如果1月1日的第一次运行失败,第二天再次运行,它仍然是
1月1日的上下文。因此,每个JobInstance可以有多个执行(本章后面将更详细地讨论JobExecution),并且在给定时间内,相同参数的同一个JobInstance只能有一个能运行。
JobInstance的定义与要加载的数据完全没有关系。如何加载数据完全取决于ItemReader实现在EndOfDay场景中,数据上可能有一列表示“有效日期”或
数据所属的“调度日期”。因此,1月1日的运行将只加载第1次的数据,而1月2日的运行将只使用第2次的数据。因为这个决定可能是一个业务决策,所以由ItemReader决定。但是,使用相同的JobInstance决定是否使用以前执行的“state”(即ExecutionContext,将在本章后面讨论)。使用新的JobInstance意思是“从头开始”,而使用现有实例通常表示“从头开始”
2.2.1、JobParameters
在讨论了JobInstance与Job的区别之后,很自然地要问:“一个JobInstance与另一个区别在哪里?”答案是:JobParameters。JobParameters对象持有一组用于启动批作业的参数。在运行过程中,它们可以用于区别不同的jobinstance,甚至作为实例运行的数据,如下图所示
[外链图片转存失败(img-Fn9VOCSp-1564901336408)
在前面的示例中,有两个jobInstance,一个用于1月1日,另一个用于1月1日
1月2日,实际上只有一个Job,但是它有两个JobParameter对象:一个以作业参数01-01-2017开始,另一个以参数01-02-2017开始。
因此,jobinstance 可抽象为为:JobInstance =Job + identifying JobParameters. 。这允许开发人员通过控制传入的参数有效定义JobInstance。
Not all job parameters are required to contribute to the identification of a
JobInstance. By default, they do so. However, the framework also allows the
submission of a Job with parameters that do not contribute to the identity of a
JobInstance
2.2.1、JobExecution ######。
JobExecution作为一个job 一次执行任务的上下文。因为job 的一个instanceJob 有可能执行失败而多次执行,这样就需要一个上下文来管理同一个instanceJob 的多次执行。一次执行可能以失败或成功结束,只有JobExecution 执行成功了JobInstance才被认为是完成的。以前面描述的EndOfDay作业为例,考虑一个01-01-2017的JobInstance,它在第一次运行时失败。
如果再次使用与第一次运行(01-01-2017)相同的作业参数运行(01-01-2017),则新的
JobExecution被创建。然而,仍然只有一个JobInstance。JobInstance则纯粹是一个组织对象,他将多个JobExecution 组合一起。而实际运行期间的主要存储机制是JobExecution.
以上面EndOfDayJob 为例 在01-01-2017 9:00 开始执行任务,到9:30 任务失败。可以看到相关表的记录。
由于第一次执行失败后,任务将停止等待第二次重启(第二次重启将从失败的位置开始)。到01-02-2017 9:00 时候,第一次执行失败的任务将从失败的位置重新开始执行,而10-02-2017的任务也将开始执行,JobInstance被一个接一个地启动,除非两个作业因为同时访问相同的数据造,从而导致在数据库级别的锁定而阻塞。否则何时运行作业完全取决于调度程序。因为他们是分开的工作,spring
Batch 框架不会阻止它们并发地运行。(当试图运行相同的程序
当另一个已经在运行时,JobInstance会导致抛出一个JobExecutionAlreadyRunningException)。现观察相关表将有新的记录:
2.3、Step
Step是一个领域对象,它体现了批处理作业的独立的、连续的阶段。
因此,每个工作都完全由一个或多个步骤组成。步骤包含定义和控制实际批处理所需的所有信息。step 不是固定的,因为任何给定步骤的内容都由开发人员决定
。一个步骤可以是简单的,也可以是复杂的。一个简单的步骤可能会将数据从文件加载到数据库中,只需要很少或根本不需要代码(取决于所使用的实现)。更复杂的步骤可能有作为处理一部分应用的复杂业务规则。与作业一样(JobExecution),Step 也有这独立 StepExecution 存储这每一个step 的执行信息。如下图:
2.3.1、StepExecution
一个 StepExecution 表示执行step的一次执行。
每次运行一个step时都会创建一个新的StepExecution,类似于JobExecution。但是,如果一个步骤因为之前的步骤失败而没有执行,则不会为它持久化执行。只有当它的step真正开始时,才会创建StepExecution
StepExecution 用来表示每一个step 的执行。每个StepExecution都包含对其相应step和与JobExecution以及事务相关数据的引用,比如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个
ExecutionContext,它包含开发人员在批处理运行期间需要持久化的任何数据,例如重新启动所需的统计信息或状态信息。下表列出了用于的属性.
2.3.1、ExecutionContext
ExecutionContext 存储这 框架需要持久化和控制的键值对集合,以便开发人员存储作用域为StepExecution or a JobExecution的持久状态。对于那些熟悉Quartz的人来说,它与Quartz JobDataMap非常相似。他们的最好作用是在发生异常时为后续的重启做数据基础。
以读取文件为例,在处理单行时,框架定期在提交点持久化ExecutionContext。这样做允许ItemReader存储它的状态,以防在运行过程中发生致命错误,甚至电源中断。所需要做的就是将当前读取的行数放入上下文中,如下例所示,框架将完成其余持久化的工作。
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以上面的EndOfDay 为示例,假设有一个步骤,‘loadData’,加载一个文件到数据库。第一次运行失败后,元数据表的变化如下:
在前面的示例中,该步骤运行了30分钟,并处理了40,321个“片段”,这将表示此场景中文件中的行。这个值在框架每次提交之前更新,并且可以包含多个行,这些行对应于ExecutionContext中的条目。
为了在提交之前事件通知,需要实现StepListener(或ItemStream)
详细内容将在后面描述。还以上面EndOfDay示例,假定任务在第二天重新启动。当它重新启动时
上次运行的ExecutionContext从数据库中重新构造。打开ItemReader时,它可以检查上下文中是否有任何存储状态,并从那里初始化自己。
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在上面的代码运行之后,当前行是40,322,允许从它停止的地方重新开始。例如,如果一个文件包含处理订单,一个订单包含多个行,可能需要存储多个订单处理(这是不同于读取行数),因此可以将电子邮件发送的最后一步,订单处理的总数。该框架为开发人员处理存储这些内容,以便正确地使用一个JobInstance对其进行调整。很难知道是否应该使用现有的ExecutionContext。例如,使用
从上面的‘EndOfDay’示例中可以看出,当01-01再次运行时,框架意识到它是相同的JobInstance,并在单个步骤的基础上拉出
将ExecutionContext从数据库中取出,并将其(作为步骤执行的一部分)交给步骤本身。相反,对于01-02运行,框架认识到它是一个不同的实例,因此必须向步骤传递一个空上下文。框架为开发人员提供了许多这种类型的决定,以确保在正确的时间给他们状态。同样重要的是要注意,在任何给定时间,每一步执行只存在一个ExecutionContext。ExecutionContext的客户端应该小心,因为这会创建一个共享密钥空间。因此,在输入值时应该小心,以确保没有覆盖数据。然而,
Step在上下文中完全不存储数据,因此没有办法对框架产生负面影响
同样重要的是,每个JobExecution至少有一个执行上下文,每一步执行一个上下文。例如,考虑下面的代码片段
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
ecStep不等于ecJob。它们是两个不同的执行上下文。
作用域为step的元素将在该step的每个提交点保存,而作用域为Job则保存在
存在每一步step执行之间.
关于springbatch 数据库表可以在maven 引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
引入后就可以在下面路径下可以找个支持很多种数据库的sql
org/springframework/batch/core/
待更新。。。。。。。。。。。
参考:
https://docs.spring.io/spring-batch/4.1.x/reference/pdf/spring-batch-reference.pdf
http://www.importnew.com/26177.html
https://kimmking.gitbooks.io/springbatchreference/content/01_introduction/12.html
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/140482.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...