所有的Batch处理都可以以一种最简单的方式描述,那就是读取大量的数据,然后进行某种方式的计算或者变形,最后输出写入到某个存储系统中。Spring Batch提供了三个简单的接口来帮助执行大量数据的读写,分别是ItemReader、ItmeWriter和ItermProcessor,你可以说它们是抽象出来的一种简单概念,它们同样也是针对不同类型数据的输入、输出和处理的方法。
我们最常用到的数据类型有三种:数据库、XML和Flat文件。为了能够清晰的描述Batch处理的流程,我们采用Flat这种文件作为输入和输出数据源,排除数据库操作和XML文件解析的带来的理解上的复杂度。
“Flat文件是一种包含没有相对关系结构的记录的文件。这个类型通常用来描述文字处理、其他结构字符或标记被移除了的文本。在任何事件中,许多用户把保存成“纯文本(text only)”类型的Microsoft Word文档叫做“Flat File(flat file)”。Flat File的另一种形式是包含了用ASCII码记录的,每个表单元由逗号分隔,用行表示记录组的文件。这种文件页叫做用逗号分隔数值(CSV)的文件.”
这里给出一个例子,从一个csv格式的flat文件中读取英语、数学、文学的分数,然后计算平均分,将结果写入另一个csv格式文件。
首先是两个Model,分别是输入的数据模型ExamScore和输出的数据模型AverageScore。
ExamScore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| package me.zeph.spring.batch.model;
public class ExamScore {
private int id;
private int english;
private int math;
private int literature;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
public int getLiterature() {
return literature;
}
public void setLiterature(int literature) {
this.literature = literature;
}
}
|
AverageScore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| package me.zeph.spring.batch.model;
public class AverageScore {
private int id;
private double score;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
}
|
然后是Processor,这里实现ItemProcessor接口。在process方法中传入输入数据模型,返回输出数据模型。
ScoreProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| package me.zeph.spring.batch.processor;
import me.zeph.spring.batch.model.AverageScore;
import me.zeph.spring.batch.model.ExamScore;
import org.springframework.batch.item.ItemProcessor;
public class ScoreProcessor implements ItemProcessor<ExamScore, AverageScore> {
public static final double COUNT = 3.0;
@Override
public AverageScore process(ExamScore examScore) throws Exception {
AverageScore averageScore = new AverageScore();
averageScore.setId(examScore.getId());
averageScore.setScore(calculateAverageScore(examScore));
return averageScore;
}
private double calculateAverageScore(ExamScore examScore) {
return (examScore.getEnglish() + examScore.getLiterature() + examScore.getMath()) / COUNT;
}
}
|
和上篇文章的一样,在spring-batch-beans.xml中定义了关于Sping Batch的一些基础Bean。
spring-batch-beans.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
</beans>
|
重点是下面一个xml文件spring-batch-jobs.xml,在这个里面定义所有关于Batch Job的具体内容。
spring-batch-jobs.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">
<import resource="spring-batch-beans.xml"/>
<bean id="csvFileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:/Users/twer/Documents/gradle-project/spring-batch-example/src/main/resources/reader_file.csv"/>
<property name="lineMapper" ref="lineMapper"/>
</bean>
<bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer" ref="lineTokenizer"/>
<property name="fieldSetMapper" ref="fieldSetMapper"/>
</bean>
<bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value=","/>
<property name="names" value="id,english,math,literature"/>
</bean>
<bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="targetType" value="me.zeph.spring.batch.model.ExamScore"/>
</bean>
<bean id="csvFileProcessor" class="me.zeph.spring.batch.processor.ScoreProcessor"/>
<bean id="csvFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:/Users/twer/Documents/gradle-project/spring-batch-example/src/main/resources/writer_file.csv"/>
<property name="lineAggregator" ref="lineAggregator"/>
</bean>
<bean id="lineAggregator" class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor" ref="fieldExtractor"/>
</bean>
<bean id="fieldExtractor" class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="id,score"/>
</bean>
<batch:job id="fileTransfer">
<batch:step id="step">
<batch:tasklet>
<batch:chunk reader="csvFileReader" processor="csvFileProcessor" writer="csvFileWriter" commit-interval="1"/>
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
|
让我们来一起分析一下上面这个xml配置。
FlatFileItemReader是Flat文件的读取类,可以按行读取输入文件。这里传入两个参数,LineMapper和Resource。
Resource顾名思义,用来指定数据源。
LineMapper用于将文件中的每行数据映射到对应的数据模型,这里使用DefaultLineMapper。LineMapper中传入两个参数,分别是行分词器LineTokenizer和域映射器FieldSetMapper。
对应的LineTokenizer采用基于分隔符的分词器,而对应的FieldSetMapper需要传入待映射的数据模型类型。
对于FlatFileItemWriter,同样要配置对应的输出源Resource和行聚合器lineAggregator。
对于lineAggregator这里同样采用基于分隔符的聚合器DelimitedLineAggregator,需要给聚合器配置对应的分隔符(比如“,”)和域提取器BeanWrapperFieldExtractor。
在域提取器BeanWrapperFieldExtractor中配置要被提取的值。
最后就是定义具体的Batch Job。
spring-batch-jobs.xml
1
2
3
4
5
6
7
| <batch:job id="fileTransfer">
<batch:step id="step">
<batch:tasklet>
<batch:chunk reader="csvFileReader" processor="csvFileProcessor" writer="csvFileWriter" commit-interval="1"/>
</batch:tasklet>
</batch:step>
</batch:job>
|
运行结果:上面是输入,下面是输出
1,100,80,80
2,100,70,80
3,100,80,70
1,86.66666666666667
2,83.33333333333333
3,83.33333333333333