SpringBoot整合SpringBatch
一、引入依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.3.0.RELEASE
com.qiang.mybaties.plus.test.batch
MybatiesPlusTestBatch
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-batch
com.oracle
ojdbc7
12.1.0.2
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
com.baomidou
mybatis-plus-boot-starter
3.1.0
com.alibaba
druid
1.1.6
org.springframework.boot
spring-boot-starter-data-redis
org.springframework.boot
spring-boot-maven-plugin
二、修改配置文件
application.yml
# 配置端口
server:
port: 8080
spring:
redis:
# IP地址
host: 192.168.17.101
# 端口
port: 6379
# 密码
password: 123456
# 数据库
database: 0
datasource:
url: jdbc:oracle:thin:@192.168.17.101:1522:XE
username: BATCH
password: 123456
driver-class-name: oracle.jdbc.OracleDriver
type: com.alibaba.druid.pool.DruidDataSource
# 指定数据库表的位置
schema: classpath:/org/springframework/batch/core/schema-oracle10g.sql
# 启动时创建表,后续可以将always修改成never
batch:
initialize-schema: always
main:
allow-bean-definition-overriding: true
# mybatis-plus相关配置
mybatis-plus:
# xml扫描,多个目录用逗号或者分号分隔(告诉 Mapper 所对应的 XML 文件位置)
mapper-locations: classpath:mapper/*.xml
# 以下配置均有默认值,可以不设置
global-config:
db-config:
#主键类型 AUTO:"数据库ID自增" INPUT:"用户输入ID",ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID";
id-type: auto
#字段策略 IGNORED:"忽略判断" NOT_NULL:"非 NULL 判断") NOT_EMPTY:"非空判断"
field-strategy: NOT_EMPTY
#数据库类型
db-type: ORACLE
configuration:
# 是否开启自动驼峰命名规则映射:从数据库列名到Java属性驼峰命名的类似映射
map-underscore-to-camel-case: true
# 如果查询结果中包含空值的列,则 MyBatis 在映射的时候,不会映射这个字段
call-setters-on-nulls: true
# 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 一级缓存配置 一级缓存是本地或者说局部缓存,它不能被关闭,只能配置缓存范围。SESSION 或者 STATEMENT。
local-cache-scope: session
# 二级缓存总开关
cache-enabled: true
三、启动类开启注解
@EnableBatchProcessing
package com.qiang.mybaties.plus.test.batch;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
/**
* @author: 小强崽
* @create: 2020/11/27 9:46
* @description:
**/
@SpringBootApplication
@MapperScan({"com.qiang.mybaties.plus.test.batch.dao"})
@EnableCaching
@EnableBatchProcessing
public class MybatiesPlusBatchApplication {
public static void main(String[] args) {
SpringApplication.run(MybatiesPlusBatchApplication.class, args);
}
}
四、测试是否成功
BatchHelloWorldJob
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: 小强崽
* @create: 2020/11/27 14:32
* @description: 第一个SpringBatch例子
**/
@Configuration
public class BatchHelloWorldJob {
/**
* 创建任务对象
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* 执行任务对象
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 创建任务
*
* @return
*/
@Bean
public Job helloWorld() {
return jobBuilderFactory.get("helloWorld")
//执行step
.start(step()).build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("step").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello World!");
// 返回的状态
return RepeatStatus.FINISHED;
}
}).build();
}
}
结果执行成功
同时数据库初始化了表
五、任务重复执行
当任务执行一次后,再次启动会出现StepExecution。
Step already complete or not restartable, so no action to execute: StepExecution
在测试阶段希望程序重复执行。
package com.qiang.mybaties.plus.test.batch.controller;
import com.qiang.mybaties.plus.test.batch.job.BatchHelloWorldJob;
import com.qiang.mybaties.plus.test.batch.response.ResponseResult;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @author: 小强崽
* @create: 2020/11/27 14:16
* @description:
**/
@RestController
@RequestMapping("/call/batch")
public class CallBatchJobController {
@Autowired
private BatchHelloWorldJob helloWorldJob;
@Autowired
private JobLauncher jobLauncher;
@GetMapping("/hello/world")
public ResponseResult helloWorld() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
// 根据时间配置每个job完成的使用参数,因为时间不一样所以参数不一样,所以任务可以重复执行
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
JobExecution run = jobLauncher.run(helloWorldJob.helloWorld(), jobParameters);
BatchStatus status = run.getStatus();
return ResponseResult.success(status);
}
}
六、SpringBatch结构
Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。 一个Job包含很多Step,step就是每个job要执行的单个步骤。
Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。 然后是Chunk,chunk就是数据块,需要定义多大的数据量是一个chunk。
Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环 这个流程,直到批处理数据完成。
七、使用Lambda创建Step
Job名称跟Step名称不能其它的Job和Step重名。Job名不能跟类名一样,无论大小写,否则会报错,执行失败。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: 小强崽
* @create: 2020/12/1 15:34
* @description: Lambda创建Step
**/
@Configuration
public class BatchLambdaJob {
/**
* 创建任务对象
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* 执行任务对象
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 创建任务
*
* @return
*/
@Bean
public Job lambdaJob() {
return jobBuilderFactory.get("lambdaJob")
// 执行lambdaStep
.start(lambdaStep()).build();
}
public Step lambdaStep() {
return stepBuilderFactory.get("lambdaStep").tasklet((stepContribution, chunkContext) -> {
System.out.println("lambdaStep执行步骤....");
return RepeatStatus.FINISHED;
}
).build();
}
}
八、多任务执行
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020/12/1 15:46
* @description: 多任务执行
**/
@Component
public class BatchMultiJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiJob() {
return jobBuilderFactory.get("multiJob")
.start(multiStep1())
.next(multiStep2())
.next(multiStep3())
.build();
}
private Step multiStep1() {
return stepBuilderFactory.get("multiStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStep1执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStep2() {
return stepBuilderFactory.get("multiStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStep2执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStep3() {
return stepBuilderFactory.get("multiStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStep3执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020/12/1 16:04
* @description: 多任务根据状态执行
**/
@Component
public class BatchMultiStatusJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiStatusJob() {
return jobBuilderFactory.get("multiStatusJob")
.start(multiStatusStep1())
.on(ExitStatus.COMPLETED.getExitCode()).to(multiStatusStep2())
.from(multiStatusStep2())
.on(ExitStatus.COMPLETED.getExitCode()).to(multiStatusStep3())
.from(multiStatusStep3())
.end()
.build();
}
private Step multiStatusStep1() {
return stepBuilderFactory.get("multiStatusStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStatusStep1执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStatusStep2() {
return stepBuilderFactory.get("multiStatusStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStatusStep2执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStatusStep3() {
return stepBuilderFactory.get("multiStatusStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStatusStep3执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
当multiStatusStep1()状态完成时执行multiStatusStep2(),以此类推,该ExitStatus类包含一下几种状态。
/*
* Copyright 2006-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core;
import org.springframework.util.StringUtils;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
/**
* Value object used to carry information about the status of a
* job or step execution.
*
* ExitStatus is immutable and therefore thread-safe.
*
* @author Dave Syer
*
*/
@SuppressWarnings("serial")
public class ExitStatus implements Serializable, Comparable {
/**
* Convenient constant value representing unknown state - assumed not
* continuable.
*/
public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");
/**
* Convenient constant value representing continuable state where processing
* is still taking place, so no further action is required. Used for
* asynchronous execution scenarios where the processing is happening in
* another thread or process and the caller is not required to wait for the
* result.
*/
public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");
/**
* Convenient constant value representing finished processing.
*/
public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
/**
* Convenient constant value representing job that did no processing (e.g.
* because it was already complete).
*/
public static final ExitStatus NOOP = new ExitStatus("NOOP");
/**
* Convenient constant value representing finished processing with an error.
*/
public static final ExitStatus FAILED = new ExitStatus("FAILED");
/**
* Convenient constant value representing finished processing with
* interrupted status.
*/
public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
}
九、Flow串行的用法
一个Flow包含多个Step,Job流程中包含Flow类型的时候需要在build()方法前调用end()方法。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020/12/1 16:26
* @description: 一个Flow包含多个Step
**/
@Component
public class BatchFlowJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job flowJob() {
return jobBuilderFactory.get("flowJob")
.start(flow())
.next(flowStep3())
.end()
.build();
}
private Step flowStep1() {
return stepBuilderFactory.get("flowStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("flowStep1执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step flowStep2() {
return stepBuilderFactory.get("flowStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("flowStep2执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step flowStep3() {
return stepBuilderFactory.get("flowStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("flowStep3执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
/**
* 创建一个Flow对象,包含若干个Step
*
* @return
*/
private Flow flow() {
return new FlowBuilder("flow")
.start(flowStep1())
.next(flowStep2())
.build();
}
}
十、Split并行的用法
Split任务并行处理。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020/12/1 16:47
* @description: 并行处理
**/
@Component
public class BatchSplitJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job splitJob() {
return jobBuilderFactory.get("splitJob")
.start(splitFlow1())
.split(new SimpleAsyncTaskExecutor()).add(splitFlow2())
.end()
.build();
}
private Step splitStep1() {
return stepBuilderFactory.get("splitStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("splitStep1执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step splitStep2() {
return stepBuilderFactory.get("splitStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("splitStep2执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step splitStep3() {
return stepBuilderFactory.get("splitStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("splitStep3执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Flow splitFlow1() {
return new FlowBuilder("splitFlow1")
.start(splitStep1())
.next(splitStep2())
.build();
}
private Flow splitFlow2() {
return new FlowBuilder("splitFlow2")
.start(splitStep3())
.build();
}
}
结果看到任务不是串行处理的,而是异步执行,也就是并行处理。
十一、Decider决策器的使用
日期决策器,判断今天是周末还是工作日。
package com.qiang.mybaties.plus.test.batch.decider;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import java.time.DayOfWeek;
import java.time.LocalDate;
/**
* @author: 小强崽
* @create: 2020/12/1 16:55
* @description: 日期决策器
**/
@Component
public class DateDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
// 判断今天是周末还是工作日
DayOfWeek dayOfWeek = LocalDate.now().getDayOfWeek();
System.out.println("今天是: " + dayOfWeek);
if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
return new FlowExecutionStatus("weekend");
} else {
return new FlowExecutionStatus("workingDay");
}
}
}
首先执行deciderStep1(),然后使用自定义的日期决策器,如果返回周末weekend那就执行deciderStep2(),如果返回工作日workingDay那就执行deciderStep3(),无论deciderStep3()的结果是什么都会执行deciderStep4()。
package com.qiang.mybaties.plus.test.batch.job;
import com.qiang.mybaties.plus.test.batch.decider.DateDecider;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020/12/1 17:04
* @description: 决策器
**/
@Component
public class BatchDeciderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DateDecider dateDecider;
@Bean
public Job deciderJob() {
// 首先执行deciderStep1(),然后使用自定义的日期决策器
// 如果返回周末weekend那就执行deciderStep2()
// 如果返回工作日workingDay那就执行deciderStep3()
// 无论deciderStep3()的结果是什么都会执行deciderStep4()
return jobBuilderFactory.get("deciderJob")
.start(deciderStep1())
.next(dateDecider)
.from(dateDecider).on("weekend").to(deciderStep2())
.from(dateDecider).on("workingDay").to(deciderStep3())
.from(deciderStep3()).on("*").to(deciderStep4())
.end()
.build();
}
private Step deciderStep1() {
return stepBuilderFactory.get("deciderStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep1执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step deciderStep2() {
return stepBuilderFactory.get("deciderStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep2执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step deciderStep3() {
return stepBuilderFactory.get("deciderStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep3执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step deciderStep4() {
return stepBuilderFactory.get("deciderStep4")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep4执行步骤四操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
十一、任务嵌套
通过childJobOne()和childJobTwo()方法创建了两个任务Job。关键在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,通过JobStepBuilder构建了一个名称为childJobOneStep的Step,顾名思义,它是一个任务型Step的构造工厂,可以将任务转换为“特殊”的步骤。在构建过程中,还需要传入任务执行器JobLauncher、任务仓库JobRepository和事务管理器PlatformTransactionManager。将任务转换为特殊的步骤后,将其赋给父任务parentJob即可。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
/**
* @author: 小强崽
* @create: 2020/12/1 17:22
* @description: 任务嵌套
**/
@Component
public class BatchNestedJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
// 任务仓库
@Autowired
private JobRepository jobRepository;
// 事务管理器
@Autowired
private PlatformTransactionManager platformTransactionManager;
/**
* 父任务
*
* @return
*/
@Bean
public Job parentJob() {
return jobBuilderFactory.get("parentJob")
.start(childJobOneStep())
.next(childJobTwoStep())
.build();
}
/**
* 将任务转换为特殊的步骤,将任务childJobOne()转成步骤childJobOneStep()
*
* @return
*/
private Step childJobOneStep() {
return new JobStepBuilder(new StepBuilder("childJobOneStep"))
.job(childJobOne())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
/**
* 将任务转换为特殊的步骤,将任务childJobTwo()转成步骤childJobTwoStep()
*
* @return
*/
private Step childJobTwoStep() {
return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
.job(childJobTwo())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
/**
* 子任务一,Job执行子任务一
*
* @return
*/
private Job childJobOne() {
return jobBuilderFactory.get("childJobOne")
.start(
stepBuilderFactory.get("childJobOneStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务一执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
/**
* 子任务二,Job执行子任务二
*
* @return
*/
private Job childJobTwo() {
return jobBuilderFactory.get("childJobTwo")
.start(
stepBuilderFactory.get("childJobTwoStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务二执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
}
十二、读取数据
Spring Batch读取数据通过ItemReader接口的实现类来完成,包括FlatFileItemReader文本数据读取、StaxEventItemReader XML文件数据读取、JsonItemReader JSON文件数据读取、JdbcPagingItemReader数据库分页数据读取等实现,更多请参考官网。
12.1 简单的数据读取
自定义一个ItemReader的实现类,实现简单数据的读取,BatchSimpleItemReader。
package com.qiang.mybaties.plus.test.batch.item.reader.job.impl;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.util.Iterator;
import java.util.List;
/**
* @author: 小强崽
* @create: 2020/12/2 10:24
* @description: 简单的数据读取,泛型为读取数据的类型
**/
public class BatchSimpleItemReader implements ItemReader {
private Iterator iterator;
public BatchSimpleItemReader(List data) {
this.iterator = data.iterator();
}
/**
* 数据一个接着一个读取
*
* @return
* @throws Exception
* @throws UnexpectedInputException
* @throws ParseException
* @throws NonTransientResourceException
*/
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// 遍历集合的数据 NAVIGATION, 返回null时代表数据读取已完成
return iterator.hasNext() ? iterator.next() : null;
}
}
测试代码BatchSimpleItemReaderJob,使用chunk()方法。chunk字面上的意思是“块”的意思,可以简单理解为数据块,泛型
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.job.impl.BatchSimpleItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author: 小强崽
* @create: 2020/12/2 10:22
* @description: 简单的数据读取
**/
@Component
public class BatchSimpleItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job simpleItemReaderJob() {
return jobBuilderFactory
.get("simpleItemReaderJob")
.start(simpleItemReaderStep())
.build();
}
public Step simpleItemReaderStep() {
return stepBuilderFactory
.get("simpleItemReaderStep")
.chunk(2)
.reader(simpleItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
public ItemReader simpleItemReader() {
List strings = Arrays.asList("存款", "余额", "资金","冻结");
return new BatchSimpleItemReader(strings);
}
}
12.2 文本数据读取
通过FlatFileItemReader
实现文本数据读取。
准备数据文件,在resources目录下新建file文件。
// 演示文件数据读取
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63
file的数据是一行一行以逗号分隔的数据(在批处理业务中,文本类型的数据文件一般都是有一定规律的)。在文本数据读取的过程中,我们需要将读取的数据转换为POJO对象存储,所以我们需要创建一个与之对应的POJO对象。新建TestVo
类,因为file文本中的一行数据经过逗号分隔后为1、11、12、13,所以我们创建的与之对应的POJO TestVo包含4个属性id、field1、field2和field3。
package com.qiang.mybaties.plus.test.batch.item.reader.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import java.io.Serializable;
/**
* @author 小强崽
* @create: 2020-12-04 16:33:11
* @description: 实体类
*/
@Data
public class TestVo extends Model {
/**
* 编号
*/
private Integer id;
/**
* 字段1
*/
private Object field1;
/**
* 字段2
*/
private Object field2;
/**
* 字段3
*/
private Object field3;
/**
* 获取主键值
*
* @return 主键值
*/
@Override
protected Serializable pkVal() {
return this.id;
}
}
测试代码。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.FileVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020-12-06 22:36
* @description: 文件数据读取
*/
@Component
public class BatchFileItemReaderJob {
/**
* 任务创建工厂
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* 步骤创建工厂
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job fileItemReaderJob() {
return jobBuilderFactory.get("fileItemReaderJob")
.start(fileItemReaderJobStep())
.build();
}
private Step fileItemReaderJobStep() {
return stepBuilderFactory.get("fileItemReaderJobStep")
.chunk(2)
.reader(fileItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader fileItemReader() {
FlatFileItemReader reader = new FlatFileItemReader<>();
// 设置文件资源地址
reader.setResource(new ClassPathResource("file"));
// 忽略第一行
reader.setLinesToSkip(1);
// AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
// 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
// 设置属性名,类似于表头
tokenizer.setNames("id", "field1", "field2", "field3");
// 将每行数据转换为TestData对象
DefaultLineMapper mapper = new DefaultLineMapper<>();
// 设置LineTokenizer
mapper.setLineTokenizer(tokenizer);
// 设置映射方式,即读取到的文本怎么转换为对应的POJO
mapper.setFieldSetMapper(fieldSet -> {
FileVo fileVo = new FileVo();
fileVo.setId(fieldSet.readInt("id"));
fileVo.setField1(fieldSet.readString("field1"));
fileVo.setField2(fieldSet.readString("field2"));
fileVo.setField3(fieldSet.readString("field3"));
return fileVo;
});
reader.setLineMapper(mapper);
return reader;
}
}
测试结果。
12.3 数据库数据读取
创建数据库表。
CREATE TABLE "ITEMREADER"."TEST" (
"ID" NUMBER(10) NOT NULL ,
"FIELD1" VARCHAR2(255) ,
"FIELD2" VARCHAR2(255) ,
"FIELD3" VARCHAR2(255) ,
PRIMARY KEY ("ID")
);
COMMENT ON COLUMN "ITEMREADER"."TEST"."ID" IS '编号';
COMMENT ON COLUMN "ITEMREADER"."TEST"."FIELD1" IS '字段1';
COMMENT ON COLUMN "ITEMREADER"."TEST"."FIELD2" IS '字段2';
COMMENT ON COLUMN "ITEMREADER"."TEST"."FIELD3" IS '字段3';
插入数据。
insert all
into TEST(id,field1,field2,field3) values(1,'11','12','13')
into TEST(id,field1,field2,field3) values(2,'21','22','23')
into TEST(id,field1,field2,field3) values(3,'31','32','33')
into TEST(id,field1,field2,field3) values(4,'41','42','43')
into TEST(id,field1,field2,field3) values(5,'51','52','53')
into TEST(id,field1,field2,field3) values(6,'61','62','63')
select 1 from dual;
创建实体TestVo。
package com.qiang.mybaties.plus.test.batch.item.reader.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import java.io.Serializable;
/**
* @author 小强崽
* @create: 2020-12-04 16:33:11
* @description: 实体类
*/
@Data
public class TestVo extends Model {
/**
* 编号
*/
private Integer id;
/**
* 字段1
*/
private Object field1;
/**
* 字段2
*/
private Object field2;
/**
* 字段3
*/
private Object field3;
/**
* 获取主键值
*
* @return 主键值
*/
@Override
protected Serializable pkVal() {
return this.id;
}
}
创建BatchDataSourceItemReaderJob
,dataSourceItemReader()
方法中的主要步骤就是:通过JdbcPagingItemReader
设置对应的数据源,然后设置数据量、获取数据的sql语句、排序规则和查询结果与POJO的映射规则等。方法末尾之所以需要调用JdbcPagingItemReader
的afterPropertiesSet()
方法是因为需要设置JDBC模板。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.TestVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.OraclePagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* @author: 小强崽
* @create: 2020/12/4 15:57
* @description: 数据库数据读取
**/
@Component
public class BatchDataSourceItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 注入数据源
*/
@Autowired
private DataSource dataSource;
@Bean
public Job dataSourceItemReaderJob() throws Exception {
return jobBuilderFactory.get("dataSourceItemReaderJob")
.start(step())
.build();
}
private Step step() throws Exception {
return stepBuilderFactory.get("step")
.chunk(2)
.reader(dataSourceItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader dataSourceItemReader() throws Exception {
JdbcPagingItemReader reader = new JdbcPagingItemReader<>();
// 设置数据源
reader.setDataSource(dataSource);
// 每次取多少条记录
reader.setFetchSize(5);
// 设置每页数据量
reader.setPageSize(5);
// 指定sql查询语句 select id,field1,field2,field3 from TEST
//MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
// 这里使用Oracle
OraclePagingQueryProvider oraclePagingQueryProvider = new OraclePagingQueryProvider();
//设置查询字段
oraclePagingQueryProvider.setSelectClause("id,field1,field2,field3");
// 设置从哪张表查询
oraclePagingQueryProvider.setFromClause("from TEST");
// 将读取到的数据转换为Test对象
reader.setRowMapper((resultSet, rowNum) -> {
TestVo testVo = new TestVo();
testVo.setId(resultSet.getInt(1));
// 读取第一个字段,类型为String
testVo.setField1(resultSet.getString(2));
testVo.setField2(resultSet.getString(3));
testVo.setField3(resultSet.getString(4));
return testVo;
});
Map sort = new HashMap<>(1);
sort.put("id", Order.ASCENDING);
// 设置排序,通过id 升序
oraclePagingQueryProvider.setSortKeys(sort);
reader.setQueryProvider(oraclePagingQueryProvider);
// 设置namedParameterJdbcTemplate等属性
reader.afterPropertiesSet();
return reader;
}
}
测试结果
12.4 XML文件读取
Spring Batch借助Spring OXM可以轻松地实现xml格式数据文件读取。在resources目录下新建file.xml。
<?xml version="1.0" encoding="utf-8" ?>
1
11
12
13
2
21
22
23
3
31
32
33
4
41
42
43
5
51
52
53
6
61
62
63
xml文件内容由一组一组的
标签组成,
标签又包含四组子标签,标签名称和XmlVo实体类属性一一对应。准备好xml文件后,我们在pom中引入spring-oxm依赖。
@Data
public class FileVo {
private int id;
private String field1;
private String field2;
private String field3;
}
org.springframework
spring-oxm
com.thoughtworks.xstream
xstream
1.4.11.1
测试代码。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.XmlVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author: 小强崽
* @create: 2020/12/8 10:11
* @description: xml文件读取
**/
@Component
public class BatchXmlFileItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job xmlFileItemReaderJob() {
return jobBuilderFactory.get("xmlFileItemReaderJob")
.start(xmlFileItemReaderStep())
.build();
}
private Step xmlFileItemReaderStep() {
return stepBuilderFactory.get("xmlFileItemReaderStep")
.chunk(2)
.reader(xmlFileItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader xmlFileItemReader() {
StaxEventItemReader reader = new StaxEventItemReader<>();
// 设置xml文件源
reader.setResource(new ClassPathResource("file.xml"));
// 指定xml文件的根标签
reader.setFragmentRootElementName("test");
// 将xml数据转换为XmlVo对象
XStreamMarshaller marshaller = new XStreamMarshaller();
// 指定需要转换的目标数据类型
Map> map = new HashMap<>(1);
map.put("test", XmlVo.class);
marshaller.setAliases(map);
reader.setUnmarshaller(marshaller);
return reader;
}
}
12.5 JSON数据读取
在resources目录下新建file.json文件。
[
{
"id": 1,
"field1": "11",
"field2": "12",
"field3": "13"
},
{
"id": 2,
"field1": "21",
"field2": "22",
"field3": "23"
},
{
"id": 3,
"field1": "31",
"field2": "32",
"field3": "33"
}
]
JSON对象属性和JsonVo对象属性一一对应。
package com.qiang.mybaties.plus.test.batch.item.reader.entity;
import lombok.Data;
/**
* @author: 小强崽
* @create: 2020/12/8 10:39
* @description: json对象
**/
@Data
public class JsonVo {
private int id;
private String field1;
private String field2;
private String field3;
}
测试代码。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.JsonVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
/**
* @author: 小强崽
* @create: 2020/12/8 10:43
* @description: json文件读取
**/
@Component
public class BatchJsonFileItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job jsonFileItemReaderJob() {
return jobBuilderFactory.get("jsonFileItemReaderJob")
.start(jsonFileItemReaderStep())
.build();
}
private Step jsonFileItemReaderStep() {
return stepBuilderFactory.get("step")
.chunk(2)
.reader(jsonItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader jsonItemReader() {
// 设置json文件地址
ClassPathResource resource = new ClassPathResource("file.json");
// 设置json文件转换的目标对象类型
JacksonJsonObjectReader jacksonJsonObjectReader = new JacksonJsonObjectReader<>(JsonVo.class);
JsonItemReader reader = new JsonItemReader<>(resource, jacksonJsonObjectReader);
// 给reader设置一个别名
reader.setName("testDataJsonItemReader");
return reader;
}
}
测试结果。
12.6 多文本数据读取
多文本的数据读取本质还是单文件数据读取,区别就是多文件读取需要在单文件读取的方式上设置一层代理。在resources目录下新建两个文件file1和file2。
// file1
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63
十三、常见问题
问题:
Caused by: org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobExecution: id=152, version=1, startTime=2020-12-02 10:38:40.348, endTime=null, lastUpdated=2020-12-02 10:38:40.349, status=STARTED, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=59, version=0, Job=[simpleItemReaderJob]], jobParameters=[{}]
解决:
由于simpleItemReaderJob异常,导致非正常完成批处理,数据库表记录冲突,删掉相关的记录,或者恢复数据库即可解决,后续将simpleItemReaderJob检查并处理异常即可。
作者(Author):小强崽
来源(Source):https://www.wuduoqiang.com/archives/SpringBoot整合SpringBatch
协议(License):署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)
版权(Copyright):商业转载请联系作者获得授权,非商业转载请注明出处。 For commercial use, please contact the author for authorization. For non-commercial use, please indicate the source.