大家好,又见面了,我是你们的朋友全栈君。
Job Launcher 和Job Repository
对应着的Java接口分别是:JobLauncher和 JobRepository
JobLauncher.
packageorg.springframework.batch.core.launch;
(...)
publicinterface JobLauncher {
publicJobExecution run(Job job, JobParameters jobParameters)
throwsJobExecutionAlreadyRunningException,
JobRestartException,JobInstanceAlreadyCompleteException,
JobParametersInvalidException;
}
JobLauncher接口接受两个参数:Job和JobParameters
调用这个Job Launcher方法:可以通过java程序来通过JobLauncher来启动,也可以通过定时任务例如Quartz scheduler来启动.
JobRepository
JobRepository保持着所有Job执行的相关元数据,JobRepository的java接口如下:
packageorg.springframework.batch.core.repository;
(...)
publicinterface JobRepository {
booleanisJobInstanceExists(String jobName, JobParametersjobParameters);
JobExecutioncreateJobExecution(
StringjobName, JobParameters jobParameters)
throwsJobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException;
voidupdate(JobExecution jobExecution);
voidadd(StepExecution stepExecution);
voidupdate(StepExecution stepExecution);
voidupdateExecutionContext(StepExecution stepExecution);
voidupdateExecutionContext(JobExecution jobExecution);
StepExecutiongetLastStepExecution(JobInstance jobInstance,
StringstepName);
intgetStepExecutionCount(JobInstance jobInstance, StringstepName);
JobExecutiongetLastJobExecution(String jobName,
JobParametersjobParameters);
}
JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。
JobRepository用于存储任务执行的状态信息,比如什么时间点执行了什么任务、任务执行结果如何等等。框架提供了2种实现,一种是通过Map形式保存在内存中,当Java程序重启后任务信息也就丢失了,并且在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,并且将数据保存在下面6张表里:
BATCH_JOB_INSTANCE
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_EXECUTION
BATCH_STEP_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_STEP_EXECUTION_CONTEXT
Spring Batch框架的JobRepository支持主流的数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。可爱的是,我司的Gauss数据库也是支持的,只不过需要稍加配置。
每个表的解析:http://blog.csdn.net/u011659172/article/details/50749534
一个job的启动通常是一个事件来进行触发的,我们会经常用到JobLauncher接口和JobParameters类,但是这个事件是可以来自任何地方的,例如一个系统的定时器;一个http请求到web控制器来启动job等这就是Spring Batch 和外界的介入点.
另外批量Job程序基本上都是用来处理各种数据的,所以上图中我们可以看到SpringBatch的出口都是在跟数据源进行打交道,这些数据源可以是任何类型,但是文件系统和数据库是最常用的,当然它也可以支持写消息给JMS.
JobExecution
importorg.springframework.batch.core.JobExecution;
jobParameters= new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString("filePath",smsFilePath)
.toJobParameters();
JobExecutionjobExecution =jobLauncher.run(sms2DBJob,jobParameters);
if(jobExecution.getStatus() ==BatchStatus.COMPLETED) {
returntrue;
}
Job Execution表示Job执行的句柄,一次Job执行可能成功也可能失败。只有Job Execution执行成功之后,对应的Job Instance才能被完成。
Job Execution对应的数据库表是:BATCH_JOB_EXECUTION,
CREATE TABLEBATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATIONVARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreignkey (JOB_INSTANCE_ID)
referencesBATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
)ENGINE=InnoDB;
对应的java类是:org.springframework.batch.core.JobExecution。
status :BatchStatus对象表示执行状态。BatchStatus.STARTED表示运行时,BatchStatus.FAILED表示执行失败,BatchStatus. COMPLETED表示任务成功结束
JobParametersIncrementer主要用于JobOperator接口的startNextInstance等方法启动job的情况下。同一个Job在batch启动后被多次调用的场合,
startNextInstance方法将会非常有用,因为它将使用JobParametersIncrementer与Job绑定,创建一个新实例。因为JobParametersIncrementer有一个getNext方法
可以在此方法中为parameters添加一个自增的值,以区分不同的Job实例
,当然,这个值在job的其他的地方并不会用到,仅仅是为了标示不同JobInstance。当然SpringBatch框架也为我们提供了一个JobParametersIncrementer的实现类RunIdIncrementer
@Bean
publicJob product2DBJob() {
logger.info("product2DBJob");
return productJobBuilderFactory.get("product2DBJob") //create ajobbuilder instance
.incrementer(newRunIdIncrementer())
.repository(jobRepository) .listener(productJobCompletionNotificationListener())
.flow(productStep())
.end()
.build();
------------------------------------------
public Job build() {
FlowJobjob = new FlowJob();
job.setName(getName());
job.setFlow(flow);
super.enhance(job);
try{
job.afterPropertiesSet();
}
catch(Exception e) {
thrownew StepBuilderException(e);
}
returnjob;
}
}
@StepScope
主要用于 Spring Batch。 它本质上是一个lazy scope,告诉Spring在首次访问时才创建bean。 在本例中, 我们需要使用
step scope 是因为使用了job参数的 ” filePath“值,这个值在应用程序启动时是不存在的。 使用 stepscope 使Spring
Batch在创建这个bean时能够找到“filePath“值。
弹性:
@Bean
public Step userStep(){
System.out.println("userStep!!!!!");
return stepBuilderFactory.get("userStep")
.<User,Message>chunk(2)
.reader(userReader(filePath))
.processor(MessageProcessor())
.writer(MessageWriter())
// .faultTolerant()
// .retry(Exception.class) 不能跳过的错误
// .noRetry(ParseException.class)
// .retryLimit(5) 最多可以让一条数据重试5次
// .listener(newRetryFailuireItemListener())
// .skip(Exception.class)
// .skipLimit(10)
// .taskExecutor(newSimpleAsyncTaskExecutor())
// .throttleLimit(10)
// .transactionManager(transactionManager)
.build();
}
/**skip(Exception.class)
* skipLimit(10):跳过十次记录报错为Exception的记录,多余十次则Job失败
* ----
* retry(Exception.class)
* noRetry(ParseException.class)
* retryLimit(5):一条记录有可能过几毫秒就可以运行成功,所以一条记录可以重试5次,但是如果一条记录包ParseException错误也就是数据本身的字段数目不对或者其他的原因则
* 不会重试就相当于skiplimit+1,当报Exception时则重试最多五次,不成功然后跳过,这个需要和skiplimit联合使用不然job会失败
*/
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/140199.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...