Spring Batch (二)- 搭建篇

本贴最后更新于 1905 天前,其中的信息可能已经时移俗易

架构图

SpringBatch 架构图.png

概念补充

对象 描述
Job Instance 作业实例,每个作业执行时,都会产生一个新的实例,实例被放在 JobRepository 中,如果作业失败,下次重新执行该作业,会使用同一个作业实例,Job 和 Job Instance 的关系可以认为 new Job = Job Instance
Job Parameters 作业参数,是用来启动批处理任务的参数,在 Job 启动的时候,可以设置任何需要作业的参数,需要注意的是作业参数会用来做标识作业实例,不同的 Job 实例通过不同的参数来区分
Job Execution 作业执行器,具体负责 Job 的执行,每次运行 Job 都会启动一个新的 job 执行器
Step Execution 作业步执行器,他负责具体 Step 执行,每次运行 Step 都会齐东东一个新的执行器
Tasklet Step 中具体的路基操作,可以设置同步,异步重复执行等操作
Execution Context 执行上下文,他是一组矿建持久化预控的 Key/value 对,能够让开发者在 Step Execution 或者 Job Execution 范畴保存需要进行持久化的状态
Item 条目,一条数据记录
Chunk Item 集合,他给定数量 Item 集合,可以定义对 Chunk 的读,处理,写操作提交间隔等

领域对象详解

Job(接口)

Job 接口方法说明

  • getName() 获取 Job 名称
  • isRestartable() 是否可以重启作业,返回 true 表示可以重启
  • execute(JobExecution execution) 运行 JobExecution
  • getJobParametersIncrementer() 获取 Job Parameters 实例
  • getJobParametersValidator() 获取 JobParametersValidator(参数验证器)实例

Step(接口)

Step 接口方法说明

  • getName() 获取 Step 名称
  • execute(StepExecution stepExecution) 执行 StepExecution,并将元信息保存在 StepExecution 中
  • getStartLimit() 可以启动次数
  • isAllowStartIfComplete() 返回 true 表示可以再次启动

ItemReader(接口)

ItemReader 接口方法说明

  • read() 读取数据返回模型

ItemProcessor(接口)

ItemProcessor 接口方法说明

  • process(I item) 处理模型

ItemWriter(接口)

ItemWriter 接口方法说明

  • write(List<? extends T> items) 写入数据

JobRepository(接口)

JobRepository 接口方法说明

  • isJobInstanceExists(String jobName, JobParameters jobParameters)
  • createJobInstance(String jobName, JobParameters jobParameters)
  • createJobExecution(JobInstance jobInstance, JobParameters jobParameters, String jobConfigurationLocation)
  • createJobExecution(String jobName, JobParameters jobParameters)
  • update(JobExecution jobExecution)
  • add(StepExecution stepExecution)
  • addAll(Collection stepExecutions)
  • update(StepExecution stepExecution)
  • updateExecutionContext(StepExecution stepExecution)
  • updateExecutionContext(JobExecution jobExecution)
  • getLastStepExecution(JobInstance jobInstance, String stepName)
  • getStepExecutionCount(JobInstance jobInstance, String stepName)
  • getLastJobExecution(String jobName, JobParameters jobParameters);

JobExplorer(接口)

JobExplorer 接口方法说明

  • getJobInstances(String jobName, int start, int count)
  • getJobExecution(@Nullable Long executionId)
  • getStepExecution(@Nullable Long jobExecutionId, @Nullable Long stepExecutionId)
  • getJobInstance(@Nullable Long instanceId)
  • getJobExecutions(JobInstance jobInstance)
  • findRunningJobExecutions(@Nullable String jobName)
  • getJobNames()
  • findJobInstancesByJobName(String jobName, int start, int count)
  • getJobInstanceCount(@Nullable String jobName)

JobLauncher(接口)

JobLauncher 接口方法说明

  • run(Job job, JobParameters jobParameters)

搭建

创建项目(内存版本)

0.任务流程

使用spring bacth 读取user-1-account.csv文件的内容
处理完成写入到user-1-account-result.csv文件中

springbacthreadfile.png

1.使用开发工具(这里笔者使用 IDEA)创建项目

2.选择如下
springbatchcreateproject.png

3.修改 pom 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.adongs</groupId>
    <artifactId>spring-batch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-batch</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <exclusions>
               <! -- 去掉jdbc,不然会使用数据模式 -->
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-jdbc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
      <!-- 添加测试组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.关闭 job 自动启动

spring:
  batch:
    job:
      enabled: false

5.配置 MapBatchConfig(基于内存的 Batch Config),需要实现 BatchConfigurer 接口

package com.adongs.springbatch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.configuration.BatchConfigurationException;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.annotation.PostConstruct;

/**
 * 基于内存处理
 * @Author yudong
 * @Date 2019/8/16 上午10:09
 * @Version 1.0
 */
@Configuration
public class MapBatchConfig implements BatchConfigurer {

    private static final Log logger = LogFactory.getLog(MapBatchConfig.class);

    /**
     * 事务管理器
     */
    private PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
    private JobRepository jobRepository;
    private JobLauncher jobLauncher;
    private JobExplorer jobExplorer;
    private MapJobRepositoryFactoryBean factoryBean;

    public MapBatchConfig() {}


    @Override
    @Bean
    public JobRepository getJobRepository() {
        return jobRepository;
    }

    @Bean
    public MapJobRegistry mapJobRegistry(){
        return new MapJobRegistry();
    }

    /**
     * 作业构建工厂
     * @return
     */
    @Bean
    public JobBuilderFactory jobBuilderFactory(){
        return new  JobBuilderFactory(jobRepository);
    }

    /**
     * 作业步构建工厂
     * @return
     */
    @Bean
    public StepBuilderFactory stepBuilderFactory(){
        return new StepBuilderFactory(jobRepository,transactionManager);
    }

    @Override
    @Bean
    public PlatformTransactionManager getTransactionManager() {
        return transactionManager;
    }

    @Override
    @Bean
    public JobLauncher getJobLauncher() {
        return jobLauncher;
    }

    @Override
    @Bean
    public JobExplorer getJobExplorer() {
        return jobExplorer;
    }

    /**
     * 初始化项目
     */
    @PostConstruct
    public void initialize() {
        try {
            this.jobRepository = createJobRepository();
            this.jobExplorer = createJobExplorer();
            this.jobLauncher = createJobLauncher();
        } catch (Exception e) {
            throw new BatchConfigurationException(e);
        }
    }

    /**
     * 创建调度器
     * @return
     * @throws Exception
     */
    public JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }




    /**
     * 创建基于内存JobRepository
     * @return
     * @throws Exception
     */
    public JobRepository createJobRepository() throws Exception {
        factoryBean = new MapJobRepositoryFactoryBean();
        factoryBean.setTransactionManager(getTransactionManager());
        factoryBean.afterPropertiesSet();
        JobRepository jobRepository = factoryBean.getObject();
        return jobRepository;
    }

    /**
     * 创建基于内存的JobExplorer
     * @return
     * @throws Exception
     */
    public JobExplorer createJobExplorer() throws Exception {
        MapJobExplorerFactoryBean mapJobExplorerFactoryBean = new MapJobExplorerFactoryBean(factoryBean);
        mapJobExplorerFactoryBean.afterPropertiesSet();
        return mapJobExplorerFactoryBean.getObject();
    }
}

6.在 resources 文件夹下创建两个文件 user-1-account.csv,user-1-account-result.csv,并在 user-1-account.csv 写入如下内容

小张,2019,50
小张,2018,100
小李,2016,20

7.创建账户实体

package com.adongs.springbatch;

import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 账户实体
 * @Author yudong
 * @Date 2019/8/15 下午4:57
 * @Version 1.0
 */
@Component
public class Account {

    /**
     * 姓名
     */
    private String name;

    /**
     * 时间
     */
    private String date;

    /**
     * 金额
     */
    private int money;

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDate() {
        return this.date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public int getMoney() {
        return this.money;
    }

    public void setMoney(int money) {
        this.money = money;
    }

    @Override
    public String toString() {
        return "Account{" +
                "name='" + name + '\'' +
                ", date=" + date +
                ", money=" + money +
                '}';
    }
}

8.配置 job

package com.adongs.springbatch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

/**
 * @Author yudong
 * @Date 2019/8/16 上午11:11
 * @Version 1.0
 */
@Configuration
public class JobConfig {


    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ApplicationContext applicationContext;

    /**
     * 配置Job
     * @return
     */
    @Bean
    public Job footballJob(Step step){
        return this.jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .listener(sampleListener())
                .start(step).build();
    }

    /**
     * 构建step
     * @param reader
     * @param processor
     * @param flatFileItemWriter
     * @return
     */
    @Bean
    public Step reconciliation(FlatFileItemReader reader,
                               CreditBillProcessor processor,
                               FlatFileItemWriter flatFileItemWriter){

        return  stepBuilderFactory.get("reconciliation")
                .<Account,Account> chunk(2)
                .reader(reader)
                .processor(processor)
                .writer(flatFileItemWriter)
                .build();
    }

    /**
     * 读取文件
     * @return
     */
    @Bean
    public FlatFileItemReader flatFileItemReader(){
        FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
        flatFileItemReader.setResource(new ClassPathResource("user-1-account.csv"));
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setDelimiter(",");
        delimitedLineTokenizer.setNames("name","date","money");
        BeanWrapperFieldSetMapper beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper();
        beanWrapperFieldSetMapper.setBeanFactory(applicationContext);
        beanWrapperFieldSetMapper.setPrototypeBeanName("account");
        DefaultLineMapper defaultLineMapper = new DefaultLineMapper();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        flatFileItemReader.setLineMapper(defaultLineMapper);
        return flatFileItemReader;
    }

    /**
     * 处理器
     * @return
     */
    @Bean
    public ItemProcessor creditBillProcessor(){
        return new ItemProcessor<Account,Account>(){
            @Override
            public Account process(Account item) throws Exception {
                System.out.println(item.toString());
                return item;
            }
        };
    }

    /**
     * 写入文件
     * @return
     */
    @Bean
    public FlatFileItemWriter flatFileItemWriter(){
        BeanWrapperFieldExtractor beanWrapperFieldExtractor = new BeanWrapperFieldExtractor();
        beanWrapperFieldExtractor.setNames(new String[]{"name","date","money"});
        DelimitedLineAggregator delimitedLineAggregator = new DelimitedLineAggregator();
        delimitedLineAggregator.setDelimiter(",");
        delimitedLineAggregator.setFieldExtractor(beanWrapperFieldExtractor);
        FlatFileItemWriter flatFileItemWriter = new FlatFileItemWriter();
        flatFileItemWriter.setResource(new ClassPathResource("user-1-account-result.csv"));
        flatFileItemWriter.setLineAggregator(delimitedLineAggregator);
        return flatFileItemWriter;
    }



    /**
     * 监听
     * @return
     */
    @Bean
    public JobExecutionListener sampleListener(){
        return new JobExecutionListener(){
            @Override
            public void beforeJob(JobExecution jobExecution) {
                System.out.println("JobExecutionListener.beforeJob");
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.println("JobExecutionListener.afterJob");
            }
        };
    }
}

8.编写测试

/**
 * @Author yudong
 * @Date 2019/8/15 下午7:34
 * @Version 1.0
 */
@RestController
public class TestRunController {


    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @GetMapping("test")
    public String test() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder().addDate("date",new Date()).toJobParameters();
        jobLauncher.run(job,jobParameters);
        return "ok";
    }

}

测试

1.启动项目

2.请求连接:http://localhost:8080/test 输出如下

2019-08-30 16:20:42.139  INFO 57596 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{date=1567153242078}]
JobExecutionListener.beforeJob
2019-08-30 16:20:42.153  INFO 57596 --- [nio-8080-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [reconciliation]
Account{name='小张', date=2018, money=100}
Account{name='小张', date=2018, money=100}
Account{name='小李', date=2016, money=20}
JobExecutionListener.afterJob
2019-08-30 16:20:42.412  INFO 57596 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{date=1567153242078}] and the following status: [COMPLETED]

3.查看 项目 >target/classes/user-1-account-result.csv

小张,2018,100
小张,2018,100
小李,2016,20

创建项目(数据库版本,开箱即用)

0.任务流程

使用spring bacth 读取user-1-account.csv文件的内容
处理完成写入到user-1-account-result.csv文件中

springbacthreadfile5d946318.png

1.使用开发工具(这里笔者使用 IDEA)创建项目

2.选择如下
springbatchcreateprojecteeb51639.png

3.修改 pom 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.adongs</groupId>
    <artifactId>spring-batch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-batch</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.修改配置文件 application.yml

spring:
  batch:
    job:
      enabled: false
  datasource:
    url: jdbc:mysql://localhost:3306/test
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver

5.创建账户实体

package com.adongs.springbatch;

import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 账户实体
 * @Author yudong
 * @Date 2019/8/15 下午4:57
 * @Version 1.0
 */
@Component
public class Account {

    /**
     * 姓名
     */
    private String name;

    /**
     * 时间
     */
    private String date;

    /**
     * 金额
     */
    private int money;

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDate() {
        return this.date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public int getMoney() {
        return this.money;
    }

    public void setMoney(int money) {
        this.money = money;
    }

    @Override
    public String toString() {
        return "Account{" +
                "name='" + name + '\'' +
                ", date=" + date +
                ", money=" + money +
                '}';
    }
}

6.创建一个配置类 JobConfig

package com.adongs.springbatch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.ClassPathResource;

/**
 * @Author yudong
 * @Date 2019/8/16 上午11:11
 * @Version 1.0
 */
@Configuration
@EnableBatchProcessing
public class JobConfig {


    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ApplicationContext applicationContext;

    /**
     * 配置Job
     * @return
     */
    @Bean
    public Job footballJob(Step step){
        return this.jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .listener(sampleListener())
                .start(step).build();
    }

    /**
     * 构建step
     * @param reader
     * @param processor
     * @param flatFileItemWriter
     * @return
     */
    @Bean
    public Step reconciliation(FlatFileItemReader reader,
                               ItemProcessor processor,
                               FlatFileItemWriter flatFileItemWriter){

        return  stepBuilderFactory.get("reconciliation")
                .<Account,Account> chunk(2)
                .reader(reader)
                .processor(processor)
                .writer(flatFileItemWriter)
                .build();
    }

    /**
     * 读取文件
     * @return
     */
    @Bean
    public FlatFileItemReader flatFileItemReader(){
        FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
        flatFileItemReader.setResource(new ClassPathResource("user-1-account.csv"));
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setDelimiter(",");
        delimitedLineTokenizer.setNames("name","date","money");
        BeanWrapperFieldSetMapper beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper();
        beanWrapperFieldSetMapper.setBeanFactory(applicationContext);
        beanWrapperFieldSetMapper.setPrototypeBeanName("account");
        DefaultLineMapper defaultLineMapper = new DefaultLineMapper();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        flatFileItemReader.setLineMapper(defaultLineMapper);
        return flatFileItemReader;
    }


    /**
     * 处理器
     * @return
     */
    @Bean
    public ItemProcessor creditBillProcessor(){
        return new ItemProcessor<Account,Account>(){
            @Override
            public Account process(Account item) throws Exception {
                System.out.println(item.toString());
                return item;
            }
        };
    }

    /**
     * 写入文件
     * @return
     */
    @Bean
    public FlatFileItemWriter flatFileItemWriter(){
        BeanWrapperFieldExtractor beanWrapperFieldExtractor = new BeanWrapperFieldExtractor();
        beanWrapperFieldExtractor.setNames(new String[]{"name","date","money"});
        DelimitedLineAggregator delimitedLineAggregator = new DelimitedLineAggregator();
        delimitedLineAggregator.setDelimiter(",");
        delimitedLineAggregator.setFieldExtractor(beanWrapperFieldExtractor);
        FlatFileItemWriter flatFileItemWriter = new FlatFileItemWriter();
        flatFileItemWriter.setResource(new ClassPathResource("user-1-account-result.csv"));
        flatFileItemWriter.setLineAggregator(delimitedLineAggregator);
        return flatFileItemWriter;
    }



    /**
     * 监听
     * @return
     */
    @Bean
    public JobExecutionListener sampleListener(){
        return new JobExecutionListener(){
            @Override
            public void beforeJob(JobExecution jobExecution) {
                System.out.println("JobExecutionListener.beforeJob");
            }

            @Override
            public void afterJob(JobExecution jobExecution) {
                System.out.println("JobExecutionListener.afterJob");
            }
        };
    }
}

7.创建测试 TestRunController

package com.adongs.springbatch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

/**
 * @Author yudong
 * @Date 2019/8/15 下午7:34
 * @Version 1.0
 */
@RestController
public class TestRunController {


    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @GetMapping("test")
    public String test() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder().addDate("date",new Date()).toJobParameters();
        jobLauncher.run(job,jobParameters);
        return "ok";
    }

}

8.创建数据库,并创建表

  • 1.创建一个名称为 test 的数据库
  • 2.找到 batch 核心包(org.springframework.batch:spring-batch-core:4.1.2.RELEASE)
    springbatchmysql1.png
  • 3.打开包 org.springframework.batch.core 找到 schema-mysql.sql
    springbatchmysql2.png
  • 4.在 mysql 中执行

9.在 resources 文件夹下创建两个文件 user-1-account.csv,user-1-account-result.csv,并在 user-1-account.csv 写入如下内容

小张,2019,50
小张,2018,100
小李,2016,20

测试

1.启动项目

2.请求连接:http://localhost:8080/test 输出如下

2019-08-30 16:20:42.139  INFO 57596 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{date=1567153242078}]
JobExecutionListener.beforeJob
2019-08-30 16:20:42.153  INFO 57596 --- [nio-8080-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [reconciliation]
Account{name='小张', date=2018, money=100}
Account{name='小张', date=2018, money=100}
Account{name='小李', date=2016, money=20}
JobExecutionListener.afterJob
2019-08-30 16:20:42.412  INFO 57596 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{date=1567153242078}] and the following status: [COMPLETED]

3.查看 项目 >target/classes/user-1-account-result.csv

小张,2018,100
小张,2018,100
小李,2016,20

4.查看数据库,发现有记录数据
springbatchmysqldb.png

相关资源

1.demo 地址(内存模式 master 分支)
1.demo 地址(数据库模式 db 分支)

  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    944 引用 • 1459 回帖 • 17 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...