关键点:为什么塔内要逐行读取?
SQL文件可能很大单个SQL文件可能达到几百MB如50万行数据如果一次性读取内存占用过高100MB文件加载需要几百MB内存而且多线程处理更容易造成OOMGC压力大大对象频繁创建和回收原因二无法按普通分号切割如果用;切割会出错// ❌ 错误做法 String[] sqls allContent.split(;); // 会误切数据里的分号正确做法逐行拼接遇到;#END#才算完整// ✅ 正确做法 StringBuilder currentSql new StringBuilder(); while ((line reader.readLine()) ! null) { currentSql.append(line); if (currentSql.toString().endsWith(;#END#)) { String sql currentSql.toString().replace(;#END#, ;); executeBatch(sql); currentSql.setLength(0); // 清空准备下一条 } }SQL文件格式示例DELETE FROM table WHERE id 1;#END# INSERT INTO table VALUES (1, data;with;semicolons);#END# INSERT INTO table VALUES (2, line1\nline2);#END#第三难同步策略多样化怎么灵活配置背景四种同步策略同步策略适用场景SQL操作数据范围全表同步基础配置表数据量小千行级TRUNCATEINSERT整张表的所有数据公司级条件同步按公司维度管理的表DELETE WHERE company_id?INSERT单个公司的所有数据店铺级增量同步有软删除标记和更新时间的表DELETE WHERE shop_id? AND ...INSERT单店铺增量数据店铺级全量同步物理删除的表DELETE WHERE shop_id?INSERT单店铺全部数据问题100张表里四种策略混杂查询条件各不相同。需要灵活配置每张表的同步策略和WHERE条件。解决方案配置驱动 占位符核心思想把同步策略、查询条件放到配置表里每张表单独配置配置表设计CREATE TABLE sync_config ( id int PRIMARY KEY, table_name varchar(100), table_level varchar(20), -- company/shop sync_type int, -- 0:全表, 1:条件同步 where_condition text, -- WHERE条件模板支持占位符 delete_strategy varchar(20) -- TRUNCATE/DELETE );配置示例-- 全表同步 INSERT INTO sync_config VALUES (1, sys_config, company, 0, NULL, TRUNCATE); -- 公司级条件同步 INSERT INTO sync_config VALUES (2, company_settings, company, 1, company_id {companyId} AND status 1, DELETE); -- 店铺级增量同步 INSERT INTO sync_config VALUES (3, user_table, shop, 1, shop_id {shopId} AND update_time {lastTime}, DELETE); -- 店铺级全量同步 INSERT INTO sync_config VALUES (4, order_table, shop, 1, shop_id {shopId}, DELETE);占位符替换逻辑private String buildWhereCondition(String template, SyncContext ctx) { if (template null) return ; // 全表同步无WHERE条件 return template .replace({shopId}, String.valueOf(ctx.getShopId())) .replace({companyId}, String.valueOf(ctx.getCompanyId())) .replace({lastTime}, ctx.getLastSyncTime()); }SQL生成过程以店铺级增量同步为例步骤1构造查询SQL// 占位符替换后得到WHERE条件 String whereCondition shop_id 123 AND update_time 2025-01-15 00:00:00; // 构造SELECT语句 String selectSql SELECT * FROM user_table WHERE whereCondition;步骤2流式读取并生成SQL文件关键点从ResultSet元数据动态获取字段而非写死字段名try (ResultSet rs stmt.executeQuery(selectSql)) { ResultSetMetaData metadata rs.getMetaData(); int columnCount metadata.getColumnCount(); // 从元数据获取列名列表 ListString columnNames new ArrayList(); for (int i 1; i columnCount; i) { columnNames.add(metadata.getColumnName(i)); } // 1. 先写DELETE语句 writer.write(DELETE FROM user_table WHERE whereCondition ;#END#); writer.write(System.lineSeparator()); // 2. 构造INSERT语句头部字段名从元数据获取 String insertHeader INSERT INTO user_table ( String.join(, , columnNames) ) VALUES\n; StringBuilder values new StringBuilder(); int batchCount 0; // 3. 流式读取数据并拼接VALUES while (rs.next()) { values.append((); for (int i 1; i columnCount; i) { if (i 1) values.append(, ); // 根据字段类型格式化值动态处理 values.append(formatValue(rs, i, metadata.getColumnType(i))); } values.append()); batchCount; // 每10行生成一条INSERT if (batchCount 10) { writer.write(insertHeader values.toString() ;#END#); writer.write(System.lineSeparator()); values.setLength(0); batchCount 0; } else { values.append(, ); } } // 4. 处理剩余数据 if (batchCount 0) { writer.write(insertHeader values.toString() ;#END#); } }最终生成的SQL文件DELETE FROM user_table WHERE shop_id 123 AND update_time 2025-01-15 00:00:00;#END# INSERT INTO user_table (id, shop_id, username, update_time) VALUES (1, 123, Alice, 2025-01-16 10:00:00), (2, 123, Bob, 2025-01-16 11:00:00);#END#优势总结✅灵活性四种策略自由配置满足不同表的需求✅可扩展新增表只需加配置代码零改动✅占位符支持{shopId}、{companyId}、{lastTime}等动态参数✅零硬编码字段名从元数据动态获取适配任意表结构第四难单表50W数据如何防止OOM问题传统方式的内存杀手// 反面教材一次性加载全部数据 String sql SELECT * FROM huge_table WHERE shop_id 123; ListMapString, Object allRows jdbcTemplate.queryForList(sql); // 直接OOM单店铺单表可能50W行全部加载到内存会导致OutOfMemoryError。解决方案流式读取 临时文件MySQL流式读取private void generateSQL(DataSource ds, String sql) throws SQLException { try (Connection conn ds.getConnection(); Statement stmt conn.createStatement( ResultSet.TYPE_FORWARD_ONLY, // 只向前遍历 ResultSet.CONCUR_READ_ONLY)) { // 只读模式 // 核心启用MySQL流式读取 stmt.setFetchSize(Integer.MIN_VALUE); // MySQL JDBC特殊约定 try (ResultSet rs stmt.executeQuery(sql)) { int batchCount 0; StringBuilder sqlValues new StringBuilder(); while (rs.next()) { // 逐行处理 sqlValues.append((); for (int i 1; i columnCount; i) { sqlValues.append(formatValue(rs, i)); } sqlValues.append()); batchCount; // 每10行生成一条INSERT if (batchCount 10) { writeInsert(sqlValues.toString()); sqlValues.setLength(0); // 清空缓冲 batchCount 0; } } } } }核心技巧stmt.setFetchSize(Integer.MIN_VALUE)MySQL JDBC的特殊约定启用服务器端游标每次只拉取1行数据到客户端内存占用恒定批量拼接VALUES多行生成一条INSERT减少SQL数量MongoDB流式读取CloseableIteratorDocument iterator mongoTemplate.stream(query, Document.class, collectionName); try { while (iterator.hasNext()) { Document doc iterator.next(); // 逐文档处理 processDocument(doc); } } finally { iterator.close(); // ⚠️ 必须手动关闭否则连接泄漏 }塔内执行流式读取try (BufferedReader reader new BufferedReader( new InputStreamReader(ossStream))) { ListString sqlBatch new ArrayList(); StringBuilder currentSql new StringBuilder(); String line; while ((line reader.readLine()) ! null) { // 拼接当前行 currentSql.append(line); // 检查是否是完整的SQL以;#END#结尾 if (currentSql.toString().endsWith(;#END#)) { // 还原特殊符号 → 正常分号 String realSql currentSql.toString().replace(;#END#, ;); // 添加到批次 sqlBatch.add(realSql); currentSql.setLength(0); // 清空准备下一条SQL // 批量执行每100条一批塔外10条数据构造成1个insert语句 if (sqlBatch.size() 100) { executeBatch(stmt, sqlBatch); sqlBatch.clear(); } } } // 执行剩余SQL if (!sqlBatch.isEmpty()) { executeBatch(stmt, sqlBatch); } // 关键自动提交避免事务过大 conn.setAutoCommit(true); }为什么setAutoCommit(true)