在大型企业中,由于业务复杂、数据量大、数据格式不同、数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理。而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理。这样的过程就是 "批处理"。
批处理应用通常有以下特点:
Spring batch 是一个轻量级的全面的批处理框架,它专为大型企业而设计,帮助开发健壮的批处理应用。Spring batch 为处理大批量数据提供了很多必要的可重用的功能,比如日志追踪、事务管理、job 执行统计、重启 job 和资源管理等。同时它也提供了优化和分片技术用于实现高性能的批处理任务。
它的核心功能包括:
笔者所在的部门属于国外某大型金融公司的 CRM 部门,在日常工作中我们经常需要开发一些批处理应用,对 Spring Batch 有着丰富的使用经验。近段时间笔者特意总结了这些经验。
在使用 Spring Batch 时推荐使用最新的 Spring Batch 3.0 版本。相比 Spring Batch2.2,它做了以下方面的提升:
支持 Spring4 和 Java8 是一个重大的提升。这样就可以使用 Spring4 引入的 Spring boot 组件,从而开发效率方面有了一个质的飞跃。引入 Spring-batch 框架只需要在 build.gradle 中加入一行代码即可:
|
|
而增强 Spring Batch Integration 的功能后,我们就可以很方便的和 Spring 家族的其他组件集成,还可以以多种方式来调用 job,也支持远程分区操作以及远程块处理。
而支持 JobScope 后我们可以随时为对象注入当前 Job 实例的上下文信息。只要我们制定 Bean 的 scope 为 job scope,那么就可以随时使用 jobParameters 和 jobExecutionContext 等信息。
|
|
之前我们在配置 job 和 step 的时候都习惯用 xml 的配置方式,但是随着时间的推移发现问题颇多。
我们渐渐发现使用纯 Java 类的配置方式更灵活,它是类型安全的,而且 IDE 的支持更好。在构建 job 或 step 时采用的流式语法相比 xml 更加简洁易懂。
|
|
在这个例子中可以很清楚的看到该 step 的配置,比如 reader/processor/writer 组件,以及配置了哪些 listener 等。
Spring batch 在运行时需要数据库支持,因为它需要在数据库中建立一套 schema 来存储 job 和 step 运行的统计信息。而在本地集成测试中我们可以借助 Spring batch 提供的内存 Repository 来存储 Spring batch 的任务执行信息,这样即避免了在本地配置一个数据库,又可以加快 job 的执行。
|
|
我们在 build.gradle 中加入对 hsqldb 的依赖:
|
|
然后在测试类中添加对 DataSource 的配置。
|
|
并且在 applicaton.properties 配置中添加初始化 Database 的配置:
|
|
Spring batch 在配置 Step 时采用的是基于 Chunk 的机制。即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给 writer 进行写入操作。这样可以最大化的优化写入效率,整个事务也是基于 Chunk 来进行。
当我们在需要将数据写入到文件、数据库中之类的操作时可以适当设置 Chunk 的值以满足写入效率最大化。但有些场景下我们的写入操作其实是调用一个 web service 或者将消息发送到某个消息队列中,那么这些场景下我们就需要设置 Chunk 的值为 1,这样既可以及时的处理写入,也不会由于整个 Chunk 中发生异常后,在重试时出现重复调用服务或者重复发送消息的情况。
Spring batch 提供了大量的 Listener 来对 job 的各个执行环节进行全面的监控。
在 job 层面 Spring batch 提供了 JobExecutionListener 接口,其支持在 Job 开始或结束时进行一些额外处理。在 step 层面 Spring batch 提供了 StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener 等接口,同时对 Retry 和 Skip 操作也提供了 RetryListener 及 SkipListener。
通常我们会为每个 job 都实现一个 JobExecutionListener,在 afterJob 操作中我们输出 job 的执行信息,包括执行时间、job 参数、退出代码、执行的 step 以及每个 step 的详细信息。这样无论是开发、测试还是运维人员对整个 job 的执行情况了如指掌。
如果某个 step 会发生 skip 的操作,我们也会为其实现一个 SkipListener,并在其中记录 skip 的数据条目,用于下一步的处理。
实现 Listener 有两种方式,一种是继承自相应的接口,比如继承 JobExecutionListener 接口,另一种是使用 annoation(注解)的方式。经过实践我们认为使用注解的方式更好一些,因为使用接口你需要实现接口的所有方法,而使用注解则只需要对相应的方法添加 annoation 即可。
下面的这个类采用了继承接口的方式,我们看到其实我们只用到了第一个方法,第二个和第三个都没有用到。但是我们必须提供一个空的实现。
|
|
而使用 annoation 的方式可以简写为:
|
|
在处理百万级的数据过程过程中难免会出现异常。如果一旦出现异常而导致整个批处理工作终止的话那么会导致后续的数据无法被处理。Spring Batch 内置了 Retry(重试)和 Skip(跳过)机制帮助我们轻松处理各种异常。适合 Retry 的异常的特点是这些异常可能会随着时间推移而消失,比如数据库目前有锁无法写入、web 服务当前不可用、web 服务满载等。所以对这些异常我们可以配置 Retry 机制。而有些异常则不应该配置 Retry,比如解析文件出现异常等,因为这些异常即使 Retry 也会始终失败。
即使 Retry 多次仍然失败也无需让整个 step 失败,可以对指定的异常设置 Skip 选项从而保证后续的数据能够被继续处理。我们也可以配置 SkipLimit 选项保证当 Skip 的数据条目达到一定数量后及时终止整个 Job。
有时候我们需要在每次 Retry 中间隔做一些操作,比如延长 Retry 时间,恢复操作现场等,Spring Batch 提供了 BackOffPolicy 来达到目的。下面是一个配置了 Retry 机制、Skip 机制以及 BackOffPolicy 的 step 示例。
|
|
在 Job 执行过程中不一定都是顺序执行的,我们经常需要根据某个 job 的输出数据或执行结果来决定下一步的走向。以前我们会把一些判断放置在下游 step 中进行,这样可能会导致有些 step 实际运行了,但其实并没有做任何事情。比如一个 step 执行过程中会将失败的数据条目记录到一个报告中,而下一个 step 会判断有没有生成报告,如果生成了报告则将该报告发送给指定联系人,如果没有则不做任何事情。这种情况下可以通过 Decider 机制来实现 Job 的执行流程。在 Spring batch 3.0 中 Decider 已经从 Step 中独立出来,和 Step 处于同一级别。
|
|
而在 job 配置中可以这样来使用 Decider。这样整个 Job 的执行流程会更加清晰易懂。
|
|
批处理工作处理的数据量大,而执行窗口一般又要求比较小。所以必须要通过多种方式来加速 Job 的执行。一般我们有四种方式来实现:
在单个 step 多线程执行任务可以借助于 taskExecutor 来实现。这种情况适合于 reader、writer 是线程安全的并且是无状态的场景。我们还可以设置线程数量。
|
|
上述示例中的 tasklet 需要实现 TaskExecutor,Spring Batch 提供了一个简单的多线程 TaskExecutor 供我们使用:SimpleAsyncTaskExecutor。
并行执行不同的 Step 在 Spring batch 中很容易实现,以下是一个示例:
|
|
在这个示例中我们先执行 step1,然后并行执行 flow1 和 flow2,最后再执行 step3。
Spring batch 提供了 PartitionStep 来实现对同一个 step 在多个进程中实现并行处理。通过 PartitonStep 再配合 PartitionHandler 可以将一个 step 扩展到多个 Slave 上实现并行运行。
远程执行 Chunk 任务则是将某个 Step 的 processer 操作分割到多个进程中,多个进程通过一些中间件进行通讯(比如采用消息的方式)。这种方式适合于 Processer 是瓶颈而 Reader 和 Writer 不是瓶颈的场景。
Spring Batch 对批处理场景进行了合理的抽象,封装了大量的实用功能,使用它来开发批处理应用可以达到事半功倍的效果。在使用的过程中我们仍需要坚持总结一些最佳实践,从而能够交付高质量的可维护的批处理应用,满足企业级应用的苛刻要求。
来源: http://www.cnblogs.com/huang0925/p/6134435.html