点亮Starhttps://github.com/apache/seatunnel来源 | 数仓生态圈概述当有多个数据库中的一张表具有类似的数据例如有多个不同的系统来源数据插入不同数据库中的表。此时若需要把这些表数据汇总成一张表进行统计分析会面临一个棘手问题由于多张表具有相同的主键直接汇总到一起会出现主键重复。而 Apache SeaTunnel 就能巧妙应对这一挑战。本篇文章将详细阐述 Apache SeaTunnel 是如何解决此问题的。设计思想本方案旨在将分布在两个独立MySQL数据库datasource1和datasource2中的test表实时、准确地同步汇总到第三个数据库datasource3的test表中。为实现实时同步与数据修改的捕获采用基于MySQL二进制日志Binlog的变更数据捕获CDC技术。为解决源表自增主键ID可能重复的问题目标表将引入“数据来源”字段并与原ID组成联合主键确保数据的唯一性与可追溯性。工具准备1、Mysql5.72、SeaTunnel2.3.12MySQL配置检查Mysql是否开启binlogmysql show variables where variable_name in (log_bin, binlog_format, binlog_row_image, gtid_mode, enforce_gtid_consistency);---------------------------------| Variable_name | Value |---------------------------------| binlog_format | ROW || binlog_row_image | FULL || enforce_gtid_consistency | OFF || gtid_mode | OFF || log_bin | ON |---------------------------------如果log_bin的值不是ON, 配置Mysql配置文件mysql.cnf。log-binmysql-binserver-id1binlog_formatROWbinlog_checksumNONE重启Mysql服务SeaTunnel配置1、安装连接器${SEATUNNEL_HOME}/config/plugin_config文件,增加connector-cdc-mysqlconnector-jdbc安装连接器sh bin/install-plugin.sh2、安装 MySQL 驱动我是用的是mysql-connector-java-8.0.28.jar把jar包放到${SEATUNNEL_HOME}/lib/目录下准备数据CREATE DATABASE source1CHARACTER SET utf8mb4;CREATE DATABASE source2CHARACTER SET utf8mb4;CREATE DATABASE source3CHARACTER SET utf8mb4;use source1;CREATE TABLE test ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(50) CHARACTER SET utf8mb4, PRIMARY KEY (id) USING BTREE) ENGINE InnoDB AUTO_INCREMENT 1 CHARACTER SET utf8mb4 ; insert into test values(1,张三);insert into test values(2,李四); use source2;CREATE TABLE test ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(50) CHARACTER SET utf8mb4, PRIMARY KEY (id) USING BTREE) ENGINE InnoDB AUTO_INCREMENT 1 CHARACTER SET utf8mb4 ; insert into test values(1,王五);insert into test values(2,赵六); use source3;CREATE TABLE test ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(50) CHARACTER SET utf8mb4, sources varchar(50) CHARACTER SET utf8mb4, PRIMARY KEY (id, sources) USING BTREE) ENGINE InnoDB AUTO_INCREMENT 1 CHARACTER SET utf8mb4 ;准备脚本env { parallelism 2 job.mode STREAMING checkpoint.interval 20000} source { MySQL-CDC { plugin_output source_data1 url jdbc:mysql://10.0.12.100:3306/source1 username root password root table-names [source1.test] startup.mode initial } MySQL-CDC { plugin_output source_data2 url jdbc:mysql://10.0.12.100:3306/source2 username root password root table-names [source2.test] startup.mode initial }}transform { Sql { plugin_input source_data1 plugin_output result1 query SELECT *, source1 AS sources FROM source_data1 } Sql { plugin_input source_data2 plugin_output result2 query SELECT *, source2 AS sources FROM source_data2 }}sink { Jdbc { plugin_input [result1,result2] url jdbc:mysql://10.0.12.100:3306/source3 driver com.mysql.cj.jdbc.Driver username root password root database source3 table test generate_sink_sql true primary_keys [id,sources] }}这里使用了一个配置来同步2个库当然也可以配置成2个任务进行同步。这里使用2个source一个source_data1一个source_data2。transform也使用2个Sql进行处理对应的2个source。这里写成固定值来读取sources。sink对应上面的2个Sql输出表test这里设置主键用于修改。这里目标表需要手动创建否则提示下面错误BLOB/TEXT column sources used in key specification without a key length测试1、启动bin/seatunnel.sh --config job/mysql_mysql.conf -m local2、查看数据mysql select * from test;---------------------| id | name | sources |---------------------| 1 | 张三 | source1 || 1 | 王五 | source2 || 2 | 李四 | source1 || 2 | 赵六 | source2 |---------------------3、修改数据use source1;insert into test values(3,钱七);update test set name张三1 where id1;use source2;delete from test where id1;4、查询数据mysql select * from test;----------------------| id | name | sources |----------------------| 1 | 张三1 | source1 || 2 | 李四 | source1 || 2 | 赵六 | source2 || 3 | 钱七 | source1 |----------------------总结以上操作就是把2个库中的表数据同步到另一个库中的一张表的所有过程我有看到其他文章可以自动创建输出表但一直没有测试出来有测试出来的朋友可以留言相告。