个月搞定100+表迁移:我的“偷师”Navicat实战复盘
一个堪称社死的工期还记得那天老板把我叫到办公室递过来一份需求文档下个月要把项目迁移到新平台数据这块你来搞定。我打开文档扫了一眼差点当场石化需求清单100张数据表要迁移还要支持后续动态新增双链路同步MySQL到MySQL、MongoDB到PostgreSQL不能写死配置要能灵活扩展工期不到1个月技术约束源环境塔外和目标环境塔内网络完全隔离塔外只能读源库无法访问目标库塔内只能写目标库无法访问源库两端唯一的桥梁阿里云OSS塔外只能写塔内可以读写塔内不支持MongoDB必须用PostgreSQL替代数据规模单表最大1000万行数据单店铺单表50万行涉及1000个店铺总计100张表那一刻我脑海里浮现的画面是在公司地下室疯狂写MyBatisselect、insert语句直到猝死...但最终我不仅提前5天完成迁移还搞出了一套能让后续表秒级上线的全自动化流水线。怎么做到的答案就藏在Navicat的导入/导出功能里——直接构造SQL文件上传OSS塔内执行复杂逻辑全都在塔外处理一眼望去的七大技术难点在开始动手前我先梳理了一下面临的挑战难点1表结构千差万别100张表每张表的字段、类型、主键都不一样。传统MyBatis方式意味着要写100个Mapper、100个实体类。后续新增表还得继续写代码复用度≈0。难点2同步策略多样化100张表需要支持四种同步策略条件各不相同全表同步基础配置表数据量小TRUNCATE后一次性插入全部数据公司级条件同步按company_id维度同步支持条件过滤店铺级增量同步有is_deleted和update_time的表按shop_id时间条件增量同步店铺级全量同步物理删除的表按shop_id维度全量同步单店铺数据每张表的策略和条件都不同需要支持灵活配置。难点3数据内容包含特殊字符某些字段的内容包含分号、单引号等SQL特殊字符如果不处理生成的SQL文件会在执行时语法报错。难点4超大数据量单表1000万数据一次性加载到内存必然OOM。而且生成的SQL文件可能几百MB网络传输和存储都是问题。难点5MongoDB到PostgreSQL的类型鸿沟MongoDB的ObjectId、BSON对象、数组类型PostgreSQL都不支持。需要做复杂的类型映射和转换。难点6网络隔离架构塔外和塔内网络完全隔离传统的ETL工具DataX根本用不了。它们都是读→处理→写的单机模式需要同时访问源库和目标库。解决方案自己搭建一个类似navicat的导入/导出能动态执行SQL的功能。难点7表间依赖关系导致的顺序问题部分表之间存在外键依赖关系如order_items依赖orders如果并发同步order_items先执行插入但orders还未同步 →外键约束失败需要识别依赖关系先同步父表再同步子表保证数据完整性解决方案塔内扫描SQL文件时优先处理父表再并发处理其他表灵感来源Navicat是怎么做的某天深夜我打开Navicat准备手动导出第一批测试数据。盯着导出向导发呆的时候突然脑子里闪过一个念头Navicat是怎么做到导出任意表的我点开导出的.sql文件-- 删除旧表 DROP TABLE IF EXISTS demo_table; -- 重建表结构 CREATE TABLE demo_table ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(50) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB; -- 插入数据 INSERT INTO demo_table VALUES (1, test);豁然开朗Navicat的核心逻辑就是用SHOW CREATE TABLE获取表结构用SELECT *查询数据生成标准SQL文件用户手动在目标库执行如果我把这套逻辑自动化呢塔外自动查表结构、自动查数据、自动生成SQL、自动上传OSS塔内自动扫描OSS、自动读取SQL文件、自动执行这不就完美契合了塔外-塔内的架构约束吗核心方案设计整体架构流程技术选型说明塔外系统技术栈组件选型使用场景选型理由消息队列RocketMQ触发同步异步解耦进行SQL文件构造支持TAG过滤(MySQLToMySQL/MongodbToPgSQL顺序消费保证数据一致性支持可后续扩展同步类型例如RedisToMySQL流式处理JDBC StreamMongoTemplate读取超大表数据避免OOMsetFetchSize(Integer.MIN_VALUE)启用MySQL服务器端游标Mongo使用流式读取的api内存占用恒定配置管理MySQL配置表管理同步规则配置驱动新增表无需改代码支持占位符动态替换{shopId}/{companyId}文件上传阿里云OSS SDKSQL文件上传唯一能打通塔外塔内的桥梁可用性99.995%支持大文件塔内系统技术栈组件选型使用场景选型理由并发控制CompletableFuture并发处理多个SQL文件JDK8原生无需引入第三方库轻量级异步编程文件下载阿里云OSS SDKSQL文件下载和删除流式下载支持逐行读取执行成功后立即删除防止重复批量执行JDBC BatchSQL批量执行1000条/批平衡性能和内存setAutoCommit(true)防止事务过大第一难100张表结构各异怎么动态生成SQL传统方案的绝望之路如果用传统MyBatis写法画面会是这样!-- 表1的Mapper -- select idqueryTable1 SELECT id, name, create_time FROM table_1 WHERE shop_id #{shopId} /select !-- 表2的Mapper -- select idqueryTable2 SELECT id, title, status FROM table_2 WHERE company_id #{companyId} /select !-- ...重复100次... --手写100个Mapper别说一个月一年都写不完而且后续新增表还得继续写代码复用度约等于0。灵感来源SHOW CREATE TABLEMySQL提供了一个神器SHOW CREATE TABLESHOW CREATE TABLE user_info;输出CREATE TABLE user_info ( id int(11) NOT NULL AUTO_INCREMENT, username varchar(50) DEFAULT NULL, create_time datetime DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB;拿到建表语句 拿到了一切表信息字段名、类型、主键...核心实现动态解析表结构public TableStructure getTableStructure(DataSource ds, String tableName) { String sql SHOW CREATE TABLE tableName ; try (Connection conn ds.getConnection(); Statement stmt conn.createStatement(); ResultSet rs stmt.executeQuery(sql)) { if (rs.next()) { String ddl rs.getString(2); // 第2列是DDL语句 // 核心正则解析DDL语句 ListString columns parseColumns(ddl); // 提取字段名 String primaryKey parsePrimaryKey(ddl); // 提取主键 return new TableStructure(columns, primaryKey); } } return null; }关键亮点表名转义防止关键字冲突如表名叫order、user正则解析DDL一次性获取字段、主键、类型信息零硬编码任何表都能自动处理后续新增表只需加配置你问怎么知道哪张表要同步表名从哪来请继续往下看...第三难中有解决方案通过配置表实现这里用到JDBC编程适合当前业务需求古法编程不得已而为之生成完整SQL文件拿到表结构后生成标准SQL文件// 1. 先删除目标环境的旧数据保证幂等性 String deleteStatement DELETE FROM user_info WHERE shop_id 12345;\n; // 2. 批量插入新数据(每批1000条) String insertStatement INSERT INTO user_info (id, username, create_time) VALUES\n (1, Alice, 2025-01-01 12:00:00),\n (2, Bob, 2025-01-02 13:00:00);\n;上传到OSS后塔内直接逐行读取执行完美第二难数据里有分号SQL会被切割炸掉问题现场默认SQL语句以;结尾但数据内容可能包含各种特殊情况-- 情况1: 数据中包含分号 INSERT INTO content VALUES (1, 教程:Java;Spring;MyBatis); -- 情况2: 数据以分号结尾 INSERT INTO config VALUES (2, path/usr/local/bin;); -- 情况3: 数据中有换行符,且以;结尾 INSERT INTO article VALUES (3, 第一行 第二行; 第三行);塔内如果用;判断SQL结束String line reader.readLine(); // 只读到: INSERT INTO content VALUES (1, 教程:Java // 数据被截断了导致SQL切割错位、语法报错。解决方案特殊符号标记 逐行读取核心思路每条SQL独占一行用特殊符号;#END#标记结束塔外生成SQL时// 关键使用特殊符号作为SQL结束标记 String SPECIAL_DELIMITER ;#END#; // 构造SQL数据内容里的分号、换行符都不处理 String sql INSERT INTO content VALUES (1, Java;Spring); // 写入文件每条SQL独占一行以特殊符号结尾 writer.write(sql SPECIAL_DELIMITER); writer.write(System.lineSeparator()); // 系统换行符上传到OSS的文件内容INSERT INTO content VALUES (1, Java;Spring);#END# INSERT INTO config VALUES (2, path/usr/bin;);#END# INSERT INTO article VALUES (3, 第一行\n第二行);#END#说明每条SQL独占一行以System.lineSeparator()换行每条SQL以;#END#结尾完整的SQL结束标记数据内容里的分号;、换行符\n等都保持原样塔内执行前还原try (BufferedReader reader new BufferedReader( new InputStreamReader(ossStream))) { ListString sqlBatch new ArrayList(); StringBuilder currentSql new StringBuilder(); String line; while ((line reader.readLine()) ! null) { // 拼接当前行 currentSql.append(line); // 检查是否是完整的SQL以;#END#结尾 if (currentSql.toString().endsWith(;#END#)) { // 还原特殊符号 → 正常分号 String realSql currentSql.toString().replace(;#END#, ;); // 添加到批次 sqlBatch.add(realSql); currentSql.setLength(0); // 清空准备下一条SQL // 批量执行每500条一批 if (sqlBatch.size() 100) { executeBatch(stmt, sqlBatch); sqlBatch.clear(); } } } // 执行剩余SQL if (!sqlBatch.isEmpty()) { executeBatch(stmt, sqlBatch); } }