SeaTunnel 数据采集实战指南概述本文档提供了一个完整的 SeaTunnel(V2.3.8) 数据采集部署和使用指南适用于 MongoDB 和 MySQLRDS的数据同步场景。通过本文您将学会如何搭建一个自动化的数据采集系统实现每日定时的数据同步任务。其他版本大同小异实际数据同步配置文档说明以官方文档为准一、环境准备1.1 系统要求环境类型要求操作系统Linux (x86_64 / ARM64)Docker20.10Kubernetes1.20可选SeaTunnel2.3.81.2 目录结构seatunnel/ ├── bin/ │ ├── mongo_start.sh# MongoDB任务启动脚本│ ├── rds_start.sh# RDS任务启动脚本│ ├── mongoshell-linux-amd64 │ └── mongoshell-linux-arm64 ├── config/ │ ├── mongo_dynamic.template# MongoDB动态任务配置│ ├── mongo_static.template# MongoDB静态任务配置│ └── rds.template# RDS任务配置├── seatunnel_mongo.yaml# K8s MongoDB部署文件└── seatunnel_rds.yaml# K8s RDS部署文件二、配置文件详解2.1 配置模板机制配置模板使用$(变量名)作为占位符启动脚本运行时会替换为实际环境变量值。同步脚本语言类型Hocon支持的占位符占位符说明$(MONGODB_URI)MongoDB连接字符串$(MONGODB_DATABASE)MongoDB数据库名称$(RDS_URI)MySQL JDBC连接前缀$(RDS_USERNAME)MySQL用户名$(RDS_PASSWORD)MySQL密码2.2 MongoDB静态任务配置用于同步固定的MongoDB集合env { parallelism 1 job.mode BATCH } source { MongoDB { uri $(MONGODB_URI) database $(MONGODB_DATABASE) collection skyladder_flowline_logs result_table_name skyladder_flowline_logs schema { columns [ { name _id, type STRING, nullable true }, { name projectId, type STRING, nullable true } ] } } } transform { sql { source_table_name [skyladder_flowline_logs] result_table_name sub_skyladder_flowline_logs query select projectId as project_id, _id as _id from skyladder_flowline_logs; } } sink { jdbc { user $(RDS_USERNAME) driver com.mysql.cj.jdbc.Driver url $(RDS_URI)/metric?useSSLfalsecharacterEncodingutf-8 password $(RDS_PASSWORD) source_table_name [sub_skyladder_flowline_logs] generate_sink_sql true database metric table metric.sub_skyladder_flowline_logs primary_keys [_id] } }2.3 MongoDB动态任务配置支持按项目ID动态遍历多个集合env { parallelism 1 job.mode BATCH } source { MongoDB { uri $(MONGODB_URI) database $(MONGODB_DATABASE) collection 3e9d762f34d944c782876ef07723e3ac.npm_allItemData result_table_name npm_allItemData schema { columns [ { name _id, type STRING, nullable true }, { name projectId, type STRING, nullable true } ] } } } transform { sql { source_table_name [npm_allItemData] result_table_name sub_kb_workbench query select _id as _id, projectId as project_id from npm_allItemData; } } sink { jdbc { user $(RDS_USERNAME) driver com.mysql.cj.jdbc.Driver url $(RDS_URI)/metric?useSSLfalsecharacterEncodingutf-8 password $(RDS_PASSWORD) source_table_name [sub_kb_workbench] generate_sink_sql true database metric table metric.sub_kb_workbench primary_keys [_id] } }2.4 RDS任务配置env { parallelism 1 job.mode BATCH } source { Jdbc { result_table_namepms_unit_info table_pathportal.pms_unit_info url$(RDS_URI)/portal?useSSLfalseuseUnicodetruecharacterEncodingutf-8allowMultiQueriestrueallowPublicKeyRetrievaltrue driver com.mysql.cj.jdbc.Driver user$(RDS_USERNAME) password$(RDS_PASSWORD) } } transform { sql { source_table_name [npm_allItemData] result_table_name sub_kb_workbench query select _id as _id, projectId as project_id from npm_allItemData; } } sink { Jdbc { source_table_name[pms_unit_info] generate_sink_sqltrue databasemetric tablea_pms_unit_info user$(RDS_USERNAME) drivercom.mysql.cj.jdbc.Driver url $(RDS_URI)/metric?useSSLfalseuseUnicodetruecharacterEncodingutf-8allowMultiQueriestrueallowPublicKeyRetrievaltrue password$(RDS_PASSWORD) schema_save_modeIGNORE data_save_modeCUSTOM_PROCESSING custom_sqltruncate table a_pms_unit_info } }三、启动脚本编写3.1 MongoDB启动脚本#!/bin/bash:${ARCH:?Error:ARCH not set,use x86 or arm}:${MONGODB_URI:?Error:MONGODB_URI not set}:${MONGODB_DATABASE:?Error:MONGODB_DATABASE not set}:${RDS_URI:?Error:RDS_URI not set}:${RDS_USERNAME:?Error:RDS_USERNAME not set}:${RDS_PASSWORD:?Error:RDS_PASSWORD not set}home/seacase$ARCHinx86)mongoshell${home}/bin/mongoshell-linux-amd64;;arm)mongoshell${home}/bin/mongoshell-linux-arm64;;*)echoError: Unsupported ARCH:$ARCH;exit1;;esac\cp${home}/config/mongo_dynamic.template${home}/config/mongo_dynamic.conf\cp${home}/config/mongo_static.template${home}/config/mongo_static.confsed-is#\$(MONGODB_URI)#${MONGODB_URI}#g${home}/config/mongo_static.confsed-is#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g${home}/config/mongo_static.confsed-is#\$(RDS_URI)#${RDS_URI}#g${home}/config/mongo_static.confsed-is#\$(RDS_USERNAME)#${RDS_USERNAME}#g${home}/config/mongo_static.confsed-is#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g${home}/config/mongo_static.confsed-is#\$(MONGODB_URI)#${MONGODB_URI}#g${home}/config/mongo_dynamic.confsed-is#\$(MONGODB_DATABASE)#${MONGODB_DATABASE}#g${home}/config/mongo_dynamic.confsed-is#\$(RDS_URI)#${RDS_URI}#g${home}/config/mongo_dynamic.confsed-is#\$(RDS_USERNAME)#${RDS_USERNAME}#g${home}/config/mongo_dynamic.confsed-is#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g${home}/config/mongo_dynamic.confmkdir-p${home}/logswhiletrue;donow$(date%s)tomorrow$(date-dtomorrow 00:00:00%s2/dev/null||date-v1d-v0H-v0M-v0S%s2/dev/null)[-z$tomorrow]tomorrow$((now-now%8640086400))sleep_seconds$((tomorrow-now))echoStarting daily task:$(date)/opt/seatunnel/bin/seatunnel.sh--config${home}/config/mongo_static.conf-elocal${home}/logs/mongo_static-$(date%Y%m%d).log21echoWaiting$sleep_secondsseconds for next run...sleep$sleep_secondsdone3.2 RDS启动脚本#!/bin/bash:${RDS_URI:?错误:环境变量 RDS_URI 未设置}:${RDS_USERNAME:?错误:环境变量 RDS_USERNAME 未设置}:${RDS_PASSWORD:?错误:环境变量 RDS_PASSWORD 未设置}# 定义其他路径使用环境变量# 根目录home/seards_config_file${home}/config/rds.conflog_dir${home}/logs\cp${home}/config/rds.template${home}/config/rds.confsed-is#\$(RDS_URI)#${RDS_URI}#g$rds_config_filesed-is#\$(RDS_USERNAME)#${RDS_USERNAME}#g$rds_config_filesed-is#\$(RDS_PASSWORD)#${RDS_PASSWORD}#g$rds_config_file# 确保日志目录存在mkdir-p$log_dir# 注意原脚本中的 chmod x /config/* 可能路径错误已修正为 ${home}/config/*chmodx${home}/config/*2/dev/null# 无限循环每天0点执行一次任务whiletrue;do# 计算距离下一个0点的秒数now$(date%s)tomorrow$(date-dtomorrow 00:00:00%s2/dev/null||date-v1d-v0H-v0M-v0S%s2/dev/null)if[-z$tomorrow];thenseconds_today$((now%86400))sleep_seconds$((86400-seconds_today))elsesleep_seconds$((tomorrow-now))fiecho开始执行每日任务:$(date)# 使用当天日期作为日志文件名按天分割today$(date%Y%m%d)static_log${log_dir}/rds-${today}.log# 将本次执行的开始时间记录到日志追加echo 开始执行任务:$(date)$static_log# 执行一次rds任务echo执行配置文件任务:$rds_config_file# 使用追加模式 将 seatunnel 输出写入当天日志文件/opt/seatunnel/bin/seatunnel.sh--config$rds_config_file-elocal$static_log21echo每日任务完成:$(date)echo当前时间:$(date)等待$sleep_seconds秒后到达下一个0点...sleep$sleep_secondsdone四、Docker部署4.1 准备目录mkdir-p/opt/data/seatunnel/{bin,config}4.2 启动RDS任务dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eRDS_URIjdbc:mysql://mysql-host:3306\-eRDS_USERNAMEuser\-eRDS_PASSWORDpassword\apache/seatunnel:2.3.8\sh/sea/bin/rds_start.sh4.3 启动MongoDB任务dockerrun--rm-d\-v/opt/data/seatunnel/:/sea\-eARCHx86\-eMONGODB_URImongodb://user:passmongo-host:27017\-eMONGODB_DATABASEdbname\-eRDS_URIjdbc:mysql://mysql-host:3306\-eRDS_USERNAMEuser\-eRDS_PASSWORDpassword\apache/seatunnel:2.3.8\sh/sea/bin/mongo_start.sh五、Kubernetes部署5.1 Deployment示例-MongoDBapiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-mongonamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-mongotemplate:metadata:labels:app:seatunnel-mongospec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:ARCHvalue:x86-name:MONGODB_URIvalue:mongodb://user:passmongo-host:27017-name:MONGODB_DATABASEvalue:dbname-name:RDS_URIvalue:jdbc:mysql://mysql-host:3306-name:RDS_USERNAMEvalue:user-name:RDS_PASSWORDvalue:passwordcommand:-/bin/sh--c-|chmod x /sea/bin/mongo_start.sh bash /sea/bin/mongo_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea5.1 Deployment示例-RDSapiVersion:apps/v1kind:Deploymentmetadata:name:seatunnel-rdsnamespace:seatunnelspec:replicas:1selector:matchLabels:app:seatunnel-rdstemplate:metadata:labels:app:seatunnel-rdsspec:volumes:-name:seatunnel-confighostPath:path:/data/seatunneltype:DirectoryOrCreatecontainers:-name:seatunnelimage:apache/seatunnel:2.3.8env:-name:RDS_URIvalue:jdbc:mysql://mysql-host:3306-name:RDS_USERNAMEvalue:user-name:RDS_PASSWORDvalue:passwordcommand:-/bin/sh--c-|chmod x /sea/bin/rds_start.sh bash /sea/bin/rds_start.shvolumeMounts:-name:seatunnel-configmountPath:/sea六、常见问题问题解决方案脚本换行符错误sed -i s/\r$// script.sh连接失败检查网络和认证信息占位符未替换确认环境变量正确传递mongo客户端可替换为本地客户端七、参考链接SeaTunnel官方文档SeaTunnel GitHub