NO END FOR LEARNING

Writing blog if you feel tired | 学海无涯 苦写博客

一起学学Spring Batch(二)

| Comments

所有的Batch处理都可以以一种最简单的方式描述,那就是读取大量的数据,然后进行某种方式的计算或者变形,最后输出写入到某个存储系统中。Spring Batch提供了三个简单的接口来帮助执行大量数据的读写,分别是ItemReader、ItmeWriter和ItermProcessor,你可以说它们是抽象出来的一种简单概念,它们同样也是针对不同类型数据的输入、输出和处理的方法。

我们最常用到的数据类型有三种:数据库、XML和Flat文件。为了能够清晰的描述Batch处理的流程,我们采用Flat这种文件作为输入和输出数据源,排除数据库操作和XML文件解析的带来的理解上的复杂度。

“Flat文件是一种包含没有相对关系结构的记录的文件。这个类型通常用来描述文字处理、其他结构字符或标记被移除了的文本。在任何事件中,许多用户把保存成“纯文本(text only)”类型的Microsoft Word文档叫做“Flat File(flat file)”。Flat File的另一种形式是包含了用ASCII码记录的,每个表单元由逗号分隔,用行表示记录组的文件。这种文件页叫做用逗号分隔数值(CSV)的文件.”

这里给出一个例子,从一个csv格式的flat文件中读取英语、数学、文学的分数,然后计算平均分,将结果写入另一个csv格式文件。

首先是两个Model,分别是输入的数据模型ExamScore和输出的数据模型AverageScore。

ExamScore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package me.zeph.spring.batch.model;

public class ExamScore {
  private int id;
  private int english;
  private int math;
  private int literature;

  public int getId() {
      return id;
  }
  
  public void setId(int id) {
      this.id = id;
  }

  public int getEnglish() {
      return english;
  }

  public void setEnglish(int english) {
      this.english = english;
  }

  public int getMath() {
      return math;
  }

  public void setMath(int math) {
      this.math = math;
  }

  public int getLiterature() {
      return literature;
  }
  
  public void setLiterature(int literature) {
      this.literature = literature;
  }
  
}
AverageScore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package me.zeph.spring.batch.model;

public class AverageScore {

  private int id;
  private double score;

  public int getId() {
      return id;
  }

  public void setId(int id) {
      this.id = id;
  }

  public double getScore() {
      return score;
  }

  public void setScore(double score) {
      this.score = score;
  }

}

然后是Processor,这里实现ItemProcessor接口。在process方法中传入输入数据模型,返回输出数据模型。

ScoreProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package me.zeph.spring.batch.processor;

import me.zeph.spring.batch.model.AverageScore;
import me.zeph.spring.batch.model.ExamScore;
import org.springframework.batch.item.ItemProcessor;

public class ScoreProcessor implements ItemProcessor<ExamScore, AverageScore> {

  public static final double COUNT = 3.0;

  @Override
  public AverageScore process(ExamScore examScore) throws Exception {
      AverageScore averageScore = new AverageScore();
      averageScore.setId(examScore.getId());
      averageScore.setScore(calculateAverageScore(examScore));
      return averageScore;
  }

  private double calculateAverageScore(ExamScore examScore) {
      return (examScore.getEnglish() + examScore.getLiterature() + examScore.getMath()) / COUNT;
  }
}

和上篇文章的一样,在spring-batch-beans.xml中定义了关于Sping Batch的一些基础Bean。

spring-batch-beans.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

</beans>

重点是下面一个xml文件spring-batch-jobs.xml,在这个里面定义所有关于Batch Job的具体内容。

spring-batch-jobs.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

    <import resource="spring-batch-beans.xml"/>

    <bean id="csvFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource" value="file:/Users/twer/Documents/gradle-project/spring-batch-example/src/main/resources/reader_file.csv"/>
        <property name="lineMapper" ref="lineMapper"/>
    </bean>

    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer" ref="lineTokenizer"/>
        <property name="fieldSetMapper" ref="fieldSetMapper"/>
    </bean>

    <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names" value="id,english,math,literature"/>
    </bean>

    <bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="targetType" value="me.zeph.spring.batch.model.ExamScore"/>
    </bean>

    <bean id="csvFileProcessor" class="me.zeph.spring.batch.processor.ScoreProcessor"/>

    <bean id="csvFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:/Users/twer/Documents/gradle-project/spring-batch-example/src/main/resources/writer_file.csv"/>
        <property name="lineAggregator" ref="lineAggregator"/>
    </bean>

    <bean id="lineAggregator" class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
        <property name="delimiter" value=","/>
        <property name="fieldExtractor" ref="fieldExtractor"/>
    </bean>

    <bean id="fieldExtractor" class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
        <property name="names" value="id,score"/>
    </bean>

    <batch:job id="fileTransfer">
        <batch:step id="step">
            <batch:tasklet>
                <batch:chunk reader="csvFileReader" processor="csvFileProcessor" writer="csvFileWriter" commit-interval="1"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

</beans>

让我们来一起分析一下上面这个xml配置。

FlatFileItemReader是Flat文件的读取类,可以按行读取输入文件。这里传入两个参数,LineMapper和Resource。

Resource顾名思义,用来指定数据源。

LineMapper用于将文件中的每行数据映射到对应的数据模型,这里使用DefaultLineMapper。LineMapper中传入两个参数,分别是行分词器LineTokenizer和域映射器FieldSetMapper。

对应的LineTokenizer采用基于分隔符的分词器,而对应的FieldSetMapper需要传入待映射的数据模型类型。

对于FlatFileItemWriter,同样要配置对应的输出源Resource和行聚合器lineAggregator。

对于lineAggregator这里同样采用基于分隔符的聚合器DelimitedLineAggregator,需要给聚合器配置对应的分隔符(比如“,”)和域提取器BeanWrapperFieldExtractor。

在域提取器BeanWrapperFieldExtractor中配置要被提取的值。

最后就是定义具体的Batch Job。

spring-batch-jobs.xml
1
2
3
4
5
6
7
<batch:job id="fileTransfer">
        <batch:step id="step">
            <batch:tasklet>
                <batch:chunk reader="csvFileReader" processor="csvFileProcessor" writer="csvFileWriter" commit-interval="1"/>
            </batch:tasklet>
        </batch:step>
</batch:job>

运行结果:上面是输入,下面是输出

1,100,80,80
2,100,70,80
3,100,80,70

1,86.66666666666667
2,83.33333333333333
3,83.33333333333333

一起学学Spring Batch(一)

| Comments

批处理应用(Batch Application)主要是在没有人工干预的情况下处理大量数据。比如,不同系统之间数据文件的导入和导出,数据的计算,定期生成财务报告等等。

Spring Batch是一个轻量级的,复杂的Batch框架,能够让你开发针对企业级系统日常操作的Batch应用。

Spring Batch提供的有用特性:

  1. Spring框架中的基础,依赖注入,切面编程,企业级别的支持
  2. 面向批处理的运行时,有效的驱动批处理应用的流程
  3. 有效处理数据,以最佳策略读写数据
  4. 可用的现成组件,提供能够定位到不同批处理场景的组件

Spring框架以轻量级容器闻名,提供配置方式来完成应用程序组件的组装。

Spring Batch对Spring框架进行了扩展,提供了专用的xml的命名空间来配置批处理的过程。

有效处理数据

以一种经典场景为例:从一个数据系统读取数据存储到另一个数据系统中。如果一次将整个数据系统中的数据读入到内存,JVM随时可能Out of Memory。当然将所有数据读入到内存肯定不是最好的办法。

Spring Batch使用一种更加有效的办法,叫做Chunk Processing(块处理方式):以数据流的方式读取输入的源数据,并处理一定数量(这一块)的记录,并写入到对应的组件。

这个块的大小可以改变,所以仍然可以让批处理一条一条的处理数据。

Ready-to-use Component

Spring Batch提供了基础设施来执行块处理,并代理I/O到专用的组件,并命名为reader和writer。

Spring为一些通用的批处理场景提供可用的组件,这些组件可以让你更加专注于业务逻辑。

Spring Batch对reader和writer场景的支持技术。

数据类型 技术格式
Database JDBC
Database Hibernate
Database JPA
Database iBatis
File Flatfile
File XML

当然Spring Batch支持的不止这些技术。当然如果没有你需要的实现,你也可以自己是实现对应的组件。

Spring Batch不是Scheduler

Spring Batch可以驱动批处理流程但是不会提供启动它们的支持,特别是基于时间的启动。Spring Batch一般会讲这些工作代理给其他Scheduler工具,例如Quartz或者Cron。

下面是一个Spring Batch Job的Hello World Example

build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
apply plugin: 'java'
apply plugin: 'idea'

repositories {
 mavenCentral()
}

dependencies {
 compile 'org.springframework.batch:spring-batch-core:2.2.5.RELEASE'
 compile 'org.springframework:spring-context:3.2.8.RELEASE'
 testCompile group: 'junit', name: 'junit', version: '4.+'
}

在这里定义一个具体的task。

tasklet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package me.zeph.spring.batch.tasklet;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

import static org.springframework.batch.repeat.RepeatStatus.FINISHED;

public class HelloTasklet implements Tasklet {

   @Override
  public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
      System.out.println("Hello Spring Batch!");
      return FINISHED;
  }

}
beans.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

   <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

   <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
   </bean>

   <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
   </bean>
</beans>

在xml中配置一个具体batch job,这里只有一步,那就是执行上面定义的task。

batch-job.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

   <import resource="spring-batch-beans.xml"/>

   <bean id="helloTasklet" class="me.zeph.spring.batch.tasklet.HelloTasklet"/>

   <batch:job id="helloJob">
        <batch:step id="helloStep">
            <batch:tasklet ref="helloTasklet"/>
        </batch:step>
   </batch:job>

</beans>

在main函数中,通过JobLauncher来运行一个Job。

main.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package me.zeph.spring.batch.tasklet;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TaskletRunner {
  public static void main(String args[]) throws Exception {
      String[] configLocations = {"spring-batch-beans.xml", "spring-batch-jobs.xml"};
      ApplicationContext applicationContext = new ClassPathXmlApplicationContext(configLocations);
      JobLauncher jobLauncher = applicationContext.getBean(JobLauncher.class);
      Job job = applicationContext.getBean(Job.class);
      JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
      System.out.println("JOB EXECUTION STATUS:" + jobExecution.getExitStatus().getExitCode());
  }
}