NO END FOR LEARNING

Writing blog if you feel tired | 学海无涯 苦写博客

一起学学Spring Batch(二)

| Comments

所有的Batch处理都可以以一种最简单的方式描述,那就是读取大量的数据,然后进行某种方式的计算或者变形,最后输出写入到某个存储系统中。Spring Batch提供了三个简单的接口来帮助执行大量数据的读写,分别是ItemReader、ItmeWriter和ItermProcessor,你可以说它们是抽象出来的一种简单概念,它们同样也是针对不同类型数据的输入、输出和处理的方法。

我们最常用到的数据类型有三种:数据库、XML和Flat文件。为了能够清晰的描述Batch处理的流程,我们采用Flat这种文件作为输入和输出数据源,排除数据库操作和XML文件解析的带来的理解上的复杂度。

“Flat文件是一种包含没有相对关系结构的记录的文件。这个类型通常用来描述文字处理、其他结构字符或标记被移除了的文本。在任何事件中,许多用户把保存成“纯文本(text only)”类型的Microsoft Word文档叫做“Flat File(flat file)”。Flat File的另一种形式是包含了用ASCII码记录的,每个表单元由逗号分隔,用行表示记录组的文件。这种文件页叫做用逗号分隔数值(CSV)的文件.”

这里给出一个例子,从一个csv格式的flat文件中读取英语、数学、文学的分数,然后计算平均分,将结果写入另一个csv格式文件。

首先是两个Model,分别是输入的数据模型ExamScore和输出的数据模型AverageScore。

ExamScore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package me.zeph.spring.batch.model;

public class ExamScore {
  private int id;
  private int english;
  private int math;
  private int literature;

  public int getId() {
      return id;
  }
  
  public void setId(int id) {
      this.id = id;
  }

  public int getEnglish() {
      return english;
  }

  public void setEnglish(int english) {
      this.english = english;
  }

  public int getMath() {
      return math;
  }

  public void setMath(int math) {
      this.math = math;
  }

  public int getLiterature() {
      return literature;
  }
  
  public void setLiterature(int literature) {
      this.literature = literature;
  }
  
}
AverageScore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package me.zeph.spring.batch.model;

public class AverageScore {

  private int id;
  private double score;

  public int getId() {
      return id;
  }

  public void setId(int id) {
      this.id = id;
  }

  public double getScore() {
      return score;
  }

  public void setScore(double score) {
      this.score = score;
  }

}

然后是Processor,这里实现ItemProcessor接口。在process方法中传入输入数据模型,返回输出数据模型。

ScoreProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package me.zeph.spring.batch.processor;

import me.zeph.spring.batch.model.AverageScore;
import me.zeph.spring.batch.model.ExamScore;
import org.springframework.batch.item.ItemProcessor;

public class ScoreProcessor implements ItemProcessor<ExamScore, AverageScore> {

  public static final double COUNT = 3.0;

  @Override
  public AverageScore process(ExamScore examScore) throws Exception {
      AverageScore averageScore = new AverageScore();
      averageScore.setId(examScore.getId());
      averageScore.setScore(calculateAverageScore(examScore));
      return averageScore;
  }

  private double calculateAverageScore(ExamScore examScore) {
      return (examScore.getEnglish() + examScore.getLiterature() + examScore.getMath()) / COUNT;
  }
}

和上篇文章的一样,在spring-batch-beans.xml中定义了关于Sping Batch的一些基础Bean。

spring-batch-beans.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

</beans>

重点是下面一个xml文件spring-batch-jobs.xml,在这个里面定义所有关于Batch Job的具体内容。

spring-batch-jobs.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

    <import resource="spring-batch-beans.xml"/>

    <bean id="csvFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource" value="file:/Users/twer/Documents/gradle-project/spring-batch-example/src/main/resources/reader_file.csv"/>
        <property name="lineMapper" ref="lineMapper"/>
    </bean>

    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer" ref="lineTokenizer"/>
        <property name="fieldSetMapper" ref="fieldSetMapper"/>
    </bean>

    <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names" value="id,english,math,literature"/>
    </bean>

    <bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="targetType" value="me.zeph.spring.batch.model.ExamScore"/>
    </bean>

    <bean id="csvFileProcessor" class="me.zeph.spring.batch.processor.ScoreProcessor"/>

    <bean id="csvFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:/Users/twer/Documents/gradle-project/spring-batch-example/src/main/resources/writer_file.csv"/>
        <property name="lineAggregator" ref="lineAggregator"/>
    </bean>

    <bean id="lineAggregator" class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
        <property name="delimiter" value=","/>
        <property name="fieldExtractor" ref="fieldExtractor"/>
    </bean>

    <bean id="fieldExtractor" class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
        <property name="names" value="id,score"/>
    </bean>

    <batch:job id="fileTransfer">
        <batch:step id="step">
            <batch:tasklet>
                <batch:chunk reader="csvFileReader" processor="csvFileProcessor" writer="csvFileWriter" commit-interval="1"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

</beans>

让我们来一起分析一下上面这个xml配置。

FlatFileItemReader是Flat文件的读取类,可以按行读取输入文件。这里传入两个参数,LineMapper和Resource。

Resource顾名思义,用来指定数据源。

LineMapper用于将文件中的每行数据映射到对应的数据模型,这里使用DefaultLineMapper。LineMapper中传入两个参数,分别是行分词器LineTokenizer和域映射器FieldSetMapper。

对应的LineTokenizer采用基于分隔符的分词器,而对应的FieldSetMapper需要传入待映射的数据模型类型。

对于FlatFileItemWriter,同样要配置对应的输出源Resource和行聚合器lineAggregator。

对于lineAggregator这里同样采用基于分隔符的聚合器DelimitedLineAggregator,需要给聚合器配置对应的分隔符(比如“,”)和域提取器BeanWrapperFieldExtractor。

在域提取器BeanWrapperFieldExtractor中配置要被提取的值。

最后就是定义具体的Batch Job。

spring-batch-jobs.xml
1
2
3
4
5
6
7
<batch:job id="fileTransfer">
        <batch:step id="step">
            <batch:tasklet>
                <batch:chunk reader="csvFileReader" processor="csvFileProcessor" writer="csvFileWriter" commit-interval="1"/>
            </batch:tasklet>
        </batch:step>
</batch:job>

运行结果:上面是输入,下面是输出

1,100,80,80
2,100,70,80
3,100,80,70

1,86.66666666666667
2,83.33333333333333
3,83.33333333333333

Comments