Spring Batch实战:Chunk机制、断点续跑与生产级调优
1. 项目概述为什么一个“Spring Batch Example”值得你花20分钟认真读完我带过三届校招新人也帮五家中小公司做过技术选型评审每次聊到数据批处理总有人脱口而出“不就是for循环读数据库、改完再写回去吗”——这话在单机跑几百条记录时确实成立。但当你的日志表每天新增800万行、订单对账需要比对37个异构系统、或者凌晨两点触发的风控模型训练任务突然卡在第12万条数据上时“for循环”就成了生产事故的邀请函。而Spring Batch就是Java生态里专治这类“数据洪流”的手术刀。它不是简单的工具库而是一套经过金融、电商、电信等高并发场景十年锤炼的批处理生命周期管理体系。你看到的“Example”背后是Chunk模式的事务分片机制、Step级别的重启恢复能力、JobRepository的元数据持久化设计以及和Spring Boot天然融合的自动配置哲学。最近我在给一家物流SaaS做运单轨迹补全优化时把原来耗时47分钟的单线程脚本用Spring Batch重构后压到6分12秒且失败后能从断点续跑——这背后不是魔法而是对ItemReader/ItemProcessor/ItemWriter三件套的精准拿捏。本文不讲概念堆砌只拆解一个真实可运行的Minimal Viable Example从零初始化Maven工程到跑通带数据库写入、异常重试、进度监控的完整链路。所有代码基于Spring Boot 3.2 Spring Batch 5.0当前最新稳定版避开了网上90%教程还在用的XML配置和过时的JobBuilderFactory写法。如果你正被定时任务卡顿、数据一致性焦虑、或面试官突然问“Batch怎么保证失败不丢数据”这些问题困扰这篇就是为你写的实操手册。2. 核心架构解析Spring Batch不是“批处理框架”而是“批处理操作系统”2.1 为什么必须抛弃“for循环思维”从三个真实故障说起先说三个我亲历的线上事故它们共同指向同一个认知盲区把批处理当成单机脚本。第一个是某电商平台的会员等级计算任务原逻辑是查出所有VIP用户ID列表约230万条for循环调用积分服务接口。结果某天积分服务响应时间从50ms飙升到800ms整个任务卡在第18万条持续占用JVM堆内存最终OOM导致应用假死。第二个是银行对账系统用单SQLUPDATE ... WHERE id IN (SELECT id FROM temp_table)更新百万级账户余额锁表时间超12分钟业务交易全部阻塞。第三个更典型某IoT平台的设备固件升级状态同步开发者用ListDevice一次性加载所有待升级设备峰值达412万条GC频繁触发Full GC间隔缩短到90秒系统进入“GC风暴”。这些都不是代码bug而是架构性误判——把批处理当成了“大号单次操作”。Spring Batch的底层设计哲学恰恰是反其道而行之它强制你把“大任务”切成可控的“小块”Chunk每个小块独立事务、独立内存、独立错误处理。就像快递分拣中心不会让一辆车送完全国包裹而是按区域分装、按邮编分拣、按楼栋投递。这种设计带来三个硬性保障第一内存可控——Chunk size设为1000就永远不会加载超过1000条记录到内存第二失败隔离——第5个Chunk失败不影响前4个已提交的结果第三断点续跑——JobRepository会持久化每个Step的执行状态重启后自动跳过已完成Chunk。这不是功能亮点而是生存底线。2.2 Spring Batch五大核心组件的物理意义很多教程把Reader/Processor/Writer画成流水线图但没说清它们在JVM里的真实存在形态。我用一个实际线程栈帮你建立体感当你启动Job时Spring Batch会创建一个SimpleAsyncTaskExecutor默认单线程这个线程会反复执行ChunkOrientedTasklet的doExecute()方法。而这个方法内部本质是三段式循环// 伪代码体现Chunk执行的真实流程 while (chunkContext.isComplete() false) { // 1. Reader从数据源拉取一批size1000 ListItem items reader.read(); // 可能是JdbcCursorItemReader底层用ResultSet.next() // 2. Processor逐条转换注意这里是for循环但数据量已被Chunk限制 ListItem processedItems new ArrayList(); for (Item item : items) { Item processed processor.process(item); // 可能调用外部HTTP服务 if (processed ! null) processedItems.add(processed); } // 3. Writer批量写入JdbcBatchItemWriter会调用JdbcTemplate.batchUpdate writer.write(processedItems); // 底层是PreparedStatement.addBatch() }关键点在于Reader决定数据源切片方式Processor决定单条处理逻辑Writer决定落地策略而ChunkManager控制整体节奏。比如JdbcPagingItemReader用ORDER BY id LIMIT 1000 OFFSET 0分页JdbcCursorItemReader用游标避免重复扫描FlatFileItemReader按行解析CSV。Processor可以是纯内存计算也可以是FeignClient调用但必须保证幂等性——因为Spring Batch的重试机制可能让同一条数据被process两次。Writer的JdbcBatchItemWriter会把1000条数据组装成一条INSERT INTO ... VALUES (...),(...),...语句比单条INSERT快17倍实测MySQL 8.0。而CompositeItemWriter则允许你同时写数据库和发Kafka消息满足审计日志双写需求。这些不是抽象接口而是有明确内存占用、线程模型、SQL生成行为的具体实现。2.3 Job与Step的生命周期为什么重启后能“记住自己在哪”很多人困惑Job重启时Batch怎么知道该从哪继续答案藏在JobRepository里。默认情况下Spring Batch用HSQLDB内存库但生产环境必须换成MySQL/PostgreSQL。它的核心表只有三张BATCH_JOB_INSTANCE记录Job定义如jobNameparams的MD5、BATCH_JOB_EXECUTION记录每次执行含开始/结束时间、状态、BATCH_STEP_EXECUTION记录每个Step的详细状态最关键的是READ_COUNT、WRITE_COUNT、COMMIT_COUNT、EXIT_CODE。当Chunk执行失败时StepExecution的EXIT_CODE会被设为FAILEDREAD_COUNT存的是已成功读取的记录数。重启Job时JobOperator.restart()会查询BATCH_STEP_EXECUTION找到最后失败的Step然后调用JdbcPagingItemReader.setPage()或JdbcCursorItemReader.setCurrentItemCount()把游标定位到READ_COUNT位置。这就是“断点续跑”的物理实现——没有黑科技就是数据库里存了几个数字。所以如果你发现重启后从头开始第一反应不是框架bug而是检查JobRepository是否配置正确或者JobLauncher是否用了SimpleJobLauncher它不支持重启。3. 实战搭建从零开始构建一个可监控、可重试、可审计的Batch作业3.1 Maven依赖与Spring Boot配置避开三个致命陷阱新建Spring Boot 3.2项目时第一个坑是依赖版本冲突。网上大量教程用spring-boot-starter-batch2.x但Spring Boot 3.x要求Jakarta EE 9命名空间必须用spring-boot-starter-batch3.2.x。我的pom.xml核心依赖如下dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-batch/artifactId !-- Spring Boot 3.2.x 自动引入 spring-batch-core 5.0.x -- /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-jdbc/artifactId /dependency dependency groupIdmysql/groupId artifactIdmysql-connector-java/artifactId scoperuntime/scope /dependency !-- 生产环境必须添加监控 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-actuator/artifactId /dependency /dependencies关键陷阱一别用HSQLDB做生产JobRepository。spring-boot-starter-batch默认启用DataSource自动配置但若你没配spring.datasource.url它会悄悄创建HSQLDB内存库重启即丢失所有执行记录。必须在application.yml中显式配置spring: datasource: url: jdbc:mysql://localhost:3306/batch_db?serverTimezoneAsia/ShanghaiuseSSLfalse username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver batch: jdbc: initialize-schema: always # 首次启动自动建表生产环境改为never关键陷阱二禁用默认Job自动执行。Spring Boot 2.x默认启动时运行所有Bean Job3.x改为spring.batch.job.enabledfalse。否则应用一启动就跑Job调试时会疯掉。正确配置spring: batch: job: enabled: false # 必须关闭通过API手动触发关键陷阱三Actuator端点要开放。想看Job执行状态必须暴露/actuator/batch端点management: endpoints: web: exposure: include: health,info,batch,metrics,loggers endpoint: batch: show-details: ALWAYS这样配置后启动应用访问http://localhost:8080/actuator/batch就能看到所有Job定义和执行历史。这是生产环境监控的生命线。3.2 数据模型与测试数据用真实业务场景驱动开发我们模拟一个电商场景每天凌晨同步用户订单数据到数据仓库。源表orders_source有10万条测试数据目标表orders_warehouse需清洗后写入。建表SQL如下-- 源数据表模拟业务库 CREATE TABLE orders_source ( id BIGINT PRIMARY KEY AUTO_INCREMENT, user_id VARCHAR(50) NOT NULL, amount DECIMAL(10,2) NOT NULL, status VARCHAR(20) DEFAULT PAID, create_time DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 目标数据仓库表字段更规范增加ETL时间戳 CREATE TABLE orders_warehouse ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(50) NOT NULL, -- 来源id转字符串 user_key VARCHAR(50) NOT NULL, -- 用户标识标准化 final_amount DECIMAL(12,2) NOT NULL, order_status ENUM(SUCCESS,FAILED,PENDING) DEFAULT SUCCESS, etl_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_user_key (user_key), INDEX idx_etl_time (etl_timestamp) );插入10万条测试数据用存储过程或Python脚本这里省略。重点在于orders_source.status是字符串PAID/SHIPPED而orders_warehouse.order_status是枚举这迫使Processor必须做状态映射——这才是真实业务的复杂性。3.3 Reader/Processor/Writer三件套编码每行代码都有业务含义3.3.1 ItemReader用JdbcPagingItemReader实现高效分页Bean public JdbcPagingItemReaderOrderSource orderSourceReader(DataSource dataSource) { JdbcPagingItemReaderOrderSource reader new JdbcPagingItemReader(); reader.setDataSource(dataSource); reader.setFetchSize(1000); // 每次从DB拉1000条匹配Chunk size // 分页查询必须有ORDER BY否则分页不可靠 MySqlPagingQueryProvider queryProvider new MySqlPagingQueryProvider(); queryProvider.setSelectClause(id, user_id, amount, status, create_time); queryProvider.setFromClause(from orders_source); queryProvider.setWhereClause(status PAID); // 只同步已支付订单 queryProvider.setSortKeys(Collections.singletonMap(id, Order.ASCENDING)); reader.setQueryProvider(queryProvider); reader.setRowMapper(new BeanPropertyRowMapper(OrderSource.class)); return reader; }为什么用JdbcPagingItemReader而不是JdbcCursorItemReader因为前者用LIMIT/OFFSET适合大数据量分页后者用JDBC游标在MySQL中可能锁表。setFetchSize(1000)告诉JDBC驱动一次取1000行减少网络往返。WHERE status PAID是业务过滤逻辑不是性能优化——它减少了Reader输出的数据量从而降低Processor和Writer压力。3.3.2 ItemProcessor状态转换与数据清洗的战场Component public class OrderProcessor implements ItemProcessorOrderSource, OrderWarehouse { Override public OrderWarehouse process(OrderSource source) throws Exception { // 业务规则1金额大于10000的订单标记为高价值 BigDecimal finalAmount source.getAmount(); if (source.getAmount().compareTo(new BigDecimal(10000)) 0) { finalAmount source.getAmount().multiply(new BigDecimal(0.95)); // 打95折 } // 业务规则2状态映射真实场景可能调用风控服务 String warehouseStatus SUCCESS; switch (source.getStatus()) { case PAID: warehouseStatus SUCCESS; break; case SHIPPED: warehouseStatus SUCCESS; break; case CANCELLED: warehouseStatus FAILED; break; default: warehouseStatus PENDING; } // 构建目标对象 OrderWarehouse warehouse new OrderWarehouse(); warehouse.setOrderId(String.valueOf(source.getId())); warehouse.setUserKey(U_ source.getUserId()); // 用户ID标准化 warehouse.setFinalAmount(finalAmount); warehouse.setOrderStatus(warehouseStatus); return warehouse; } }Processor的核心原则无副作用、幂等、轻量。这里做了两件事金额调整业务规则和状态映射数据清洗。注意String.valueOf(source.getId())——源表id是BIGINT目标表order_id是VARCHAR这是典型的类型适配。如果Processor里调用FeignClient必须加RetryableTopic注解并配置重试策略否则网络抖动会导致整个Chunk失败。3.3.3 ItemWriter批量写入与错误处理Bean public JdbcBatchItemWriterOrderWarehouse orderWarehouseWriter(DataSource dataSource) { JdbcBatchItemWriterOrderWarehouse writer new JdbcBatchItemWriter(); writer.setDataSource(dataSource); writer.setSql(INSERT INTO orders_warehouse (order_id, user_key, final_amount, order_status, etl_timestamp) VALUES (:orderId, :userKey, :finalAmount, :orderStatus, NOW())); // 使用BeanPropertyItemSqlParameterSource来自动映射属性 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()); // 关键设置异常分类器让特定异常不导致Chunk失败 writer.setExceptionClassifier(new CustomSQLExceptionClassifier()); return writer; } // 自定义异常分类器主键冲突时跳过不中断Chunk public class CustomSQLExceptionClassifier extends SQLExceptionClassifier { public CustomSQLExceptionClassifier() { setDefaultClassification(new RuntimeException(Unknown SQL error)); // MySQL主键冲突错误码1062归类为非致命错误 addClassification(1062, new NonTransientResourceException(Duplicate key)); } }Writer的setSql()用命名参数:orderId比位置参数?更安全。BeanPropertyItemSqlParameterSourceProvider自动把OrderWarehouse的getter方法名转为SQL参数名。最关键是ExceptionClassifier——它让Batch知道主键冲突1062错误是业务可接受的跳过这条数据继续写而连接超时则是致命错误必须回滚整个Chunk。这比try-catch优雅得多。3.4 Job配置Chunk机制与重试策略的黄金组合Bean public Job syncOrdersJob(JobRepository jobRepository, JobCompletionNotificationListener listener, Step syncOrdersStep) { return new JobBuilder(syncOrdersJob, jobRepository) .listener(listener) // 监听Job完成事件 .flow(syncOrdersStep) .end() .build(); } Bean public Step syncOrdersStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcPagingItemReaderOrderSource reader, OrderProcessor processor, JdbcBatchItemWriterOrderWarehouse writer) { return new StepBuilder(syncOrdersStep, jobRepository) .OrderSource, OrderWarehousechunk(1000, transactionManager) // Chunk size1000 .reader(reader) .processor(processor) .writer(writer) .faultTolerant() // 启用容错 .skip(Exception.class) // 跳过所有异常 .skipLimit(10) // 最多跳过10条 .retry(ConnectException.class) // 网络异常重试 .retryLimit(3) // 最多重试3次 .retryPolicy(new SimpleRetryPolicy(3, Collections.singletonMap(ConnectException.class, true))) .build(); }Chunk size设为1000是经验值太小如10导致事务开销大太小如10000内存占用高。faultTolerant()开启容错后.skip()和.retry()才能生效。这里配置了双重保护对所有异常跳过最多10条对ConnectException重试最多3次。重试策略用SimpleRetryPolicy明确指定只对网络异常重试——因为数据库唯一键冲突重试毫无意义。JobCompletionNotificationListener是自定义监听器Job完成后发企业微信告警代码略。4. 进阶实战监控、调优与生产级问题排查4.1 Actuator监控端点详解读懂Batch的健康心跳启动应用后访问http://localhost:8080/actuator/batch返回JSON结构如下{ jobs: [ { name: syncOrdersJob, instances: [ { id: 1, execution: { id: 1, status: COMPLETED, startTime: 2023-10-05T02:15:22.123, endTime: 2023-10-05T02:16:45.678, readCount: 100000, writeCount: 100000, commitCount: 100, rollbackCount: 0, exitCode: COMPLETED } } ] } ] }关键指标解读readCount/writeCount验证数据完整性二者必须相等除非Processor过滤了数据commitCount等于readCount / chunkSize这里100000/1000100证明Chunk机制生效rollbackCount非零值说明有Chunk因异常回滚需查日志exitCodeCOMPLETED是理想状态FAILED需结合failureExceptions字段分析更细粒度的监控在/actuator/metrics端点搜索spring.batch.job.execution.time可看到Job执行时间的P95/P99分位值。我把这个指标接入Prometheus当P95300秒时自动告警——这比单纯看“成功/失败”更有业务价值。4.2 性能调优四步法从6分钟到90秒的实录在我优化物流订单同步Job时初始版本耗时6分23秒。按以下步骤逐步优化第一步定位瓶颈Arthas诊断用Arthastrace命令跟踪JdbcPagingItemReader.read()发现ResultSet.next()平均耗时42ms远高于预期。原因orders_source表缺少status字段索引。加索引后Reader耗时从3分12秒降到48秒。第二步调整Chunk size原Chunk size100commitCount1000事务开销大。改为1000后commitCount100Writer批量写入效率提升。但Chunk size不能无限增大——当设为5000时单次read()内存占用超200MBGC压力剧增。最终选定2000平衡内存与IO。第三步启用并行Step谨慎使用对独立数据源可并行处理。例如把订单按user_id % 4分4个Step每个Step处理不同用户段。但要注意并行Step不能共享同一Writer否则数据竞争。我们改用PartitionStep主Step负责分片4个Slave Step各写不同表分区。第四步数据库连接池调优HikariCP默认maximumPoolSize10但并行Step需要更多连接。设为maximumPoolSize20并增加connection-timeout30000。最终耗时压到1分32秒提升4.2倍。4.3 生产环境十大高频问题与根因分析问题现象根本原因解决方案我踩过的坑Job重启后从头开始JobRepository未持久化到MySQL或JobLauncher用错实例检查application.yml中spring.batch.jdbc.initialize-schemaalways是否生效确认JobLauncher是JobOperator注入的曾因IDEA Maven profile未激活本地用HSQLDB上线才发现执行记录丢失Chunk执行缓慢CPU 100%Processor中调用同步HTTP服务未设超时在FeignClient中配置connectTimeout3000, readTimeout5000或改用WebClient异步调用某次调用外部地址服务超时默认30秒一个Chunk卡住30秒×1000次8.3小时数据库写入重复JdbcBatchItemWriter未配置ExceptionClassifier主键冲突导致Chunk回滚重试如前文用CustomSQLExceptionClassifier捕获1062错误重复数据导致下游报表金额翻倍凌晨三点被电话叫醒内存溢出OOMJdbcCursorItemReader在MySQL中未设useCursorFetchtrue全量加载到内存在application.yml中添加spring.datasource.hikari.data-source-properties.useCursorFetchtrue一次加载50万条JVM堆直接撑爆日志刷屏无法定位问题默认日志级别为DEBUG每条Chunk都打印SQL在logback-spring.xml中设置logger nameorg.springframework.batch levelINFO/日志文件每小时增长2GB磁盘爆满Job执行时间波动大网络抖动导致Processor调用超时触发重试重试策略中排除SocketTimeoutException改用降级逻辑重试3次后仍失败不如直接返回默认值监控端点404spring-boot-starter-actuator未引入或management.endpoints.web.exposure.include未配置batch检查pom依赖确认yml中include: batch新人常忘加actuator依赖以为Batch没生效数据库连接泄漏JdbcPagingItemReader未关闭或DataSource配置错误确保readerBean作用域为prototype或用PreDestroy关闭连接池耗尽新请求全部超时Step状态显示UNKNOWNJobRepository表结构与Spring Batch版本不匹配运行spring-batch-core-5.0.x.jar中的schema-mysql.sql脚本版本升级后未更新表结构状态字段为空并行Step数据倾斜分片策略不合理如按ID分片导致热点用户数据集中改用哈希分片user_id.hashCode() % partitionCount某个Step处理80%数据其他3个Step空转4.4 安全加固与合规实践金融级Batch的必备清单在银行项目中客户要求Batch作业满足等保三级。我们做了五项加固敏感数据脱敏Processor中对user_id做SHA256哈希而非明文写入user_keySQL注入防护Writer严格用命名参数禁用字符串拼接SQL执行权限最小化数据库账号仅授予orders_sourceSELECT和orders_warehouseINSERT权限审计日志留存自定义JobExecutionListener在afterJob()中记录JobInstance.getId()、JobExecution.getExitStatus()、StepExecution.getReadCount()到独立审计表密码加密application.yml中spring.datasource.password用Jasypt加密启动时通过--jasypt.encryptor.passwordxxx解密。特别提醒JdbcCursorItemReader在MySQL中默认不启用游标必须在application.yml中显式配置spring: datasource: hikari: >