异源数据同步 → 记一次 DataX 已同步数据量优化
跟好朋友吃夜宵吐槽我的相亲情况我近五年来我自认为是一位作风正直的人不抽烟不喝酒也不和女孩打情骂俏作息规律21点睡06点起并且我性格也好安静、老实、非常听话最后我叹气道这么多优点怎么相亲女孩都看不上我呢朋友刚出来半年就忘记我们是怎么进去的呢当初没钱还带我去PC我都不好意思说你缓冲区阻塞在 异构数据源同步之数据同步 → datax 改造有点意思 中提到 Runtime.getRuntime().exec 会发生阻塞究其原因是缓冲区填满导致的死锁当 Runtime 对象调用 exec(cmd) 后JVM 会启动一个子进程该进程会与 JVM 进程建立三个管道连接标准输入stdin、标准输出stdout、标准错误stderr这些管道在操作系统中都有固定大小的缓冲区。如果子进程持续向 stdout 或 stderr 写入数据而父进程JVM没有及时通过 Process.getInputStream() 和 Process.getErrorStream() 来读取缓冲区就会被填满一旦缓冲区满子进程的写入操作就会被阻塞进而挂起。此时如果父进程调用了 process.waitFor() 等待子进程结束就会形成经典的死锁父进程等子进程结束子进程等父进程读取缓冲区程序便永远卡住我们采用了两个线程分别来读取stdout和stderrProcess process Runtime.getRuntime().exec(DATAX_COMMAND); // 另启线程读取标准输出 new Thread(() - { try (BufferedReader reader new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line reader.readLine()) ! null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); // 另启线程读取错误输出 new Thread(() - { try (BufferedReader errorReader new BufferedReader(new InputStreamReader(process.getErrorStream(), SYSTEM_ENCODING))) { String line; while ((line errorReader.readLine()) ! null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); // 等待命令执行完成 int i process.waitFor(); if (i 0) { System.out.println(job执行完成); } else { System.out.println(job执行失败); }如果我们不用区分标准输出和错误输出我们可以将错误输出合并到标准输出ListString DATAX_COMMNDS Arrays.asList(java, -server, -Xms4g, -Xmx4g, -Dfile.encodingGBK, -Dlogback.statusListenerClassch.qos.logback.core.status.NopStatusListener, -Ddatax.homeE:\\git-project\\datax-home, -Dlogback.configurationFileE:\\git-project\\datax-home\\conf\\logback.xml, -classpath, E:\\git-project\\datax-home\\lib\\*, com.alibaba.datax.core.Engine, -mode, standalone, -job, E:\\git-project\\datax-home\\job\\mysql2mysql.json) ProcessBuilder pb new ProcessBuilder(DATAX_COMMNDS); // 合并错误流到标准输出 pb.redirectErrorStream(true); Process process pb.start(); // 另启线程读取标准输出 new Thread(() - { try (BufferedReader reader new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line reader.readLine()) ! null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); // 等待命令执行完成 int i process.waitFor(); if (i 0) { System.out.println(job执行完成); } else { System.out.println(job执行失败); }每执行一次任务都创建一个新的线程来读取输出流是不合理的线程的创建与销毁都是存在资源消耗的更合理的做法是采用线程池线程池的合理创建与业务有关IO密集还是CPU密集就不展开了已同步记录数因为要实时感知 DataX 的同步记录数我们改造了 DataX 的日志输出将 DataX 每次写入目标库的记录数输出到日志中然后读取日志中的记录数并进行累加实时更新到数据库中。具体实现可参考异源数据同步 → 如何获取 DataX 已同步数据量其中强调了持久化到数据库是一定要采用updatetable_namesetsync_rows sync_rows syncRows;具体实现类似如下private static final ListString DATAX_COMMNDS Arrays.asList(java, -server, -Xms4g, -Xmx4g, -Dfile.encodingGBK, -Dlogback.statusListenerClassch.qos.logback.core.status.NopStatusListener, -Ddatax.homeE:\\git-project\\datax-home, -Dlogback.configurationFileE:\\git-project\\datax-home\\conf\\logback.xml, -classpath, E:\\git-project\\datax-home\\lib\\*, com.alibaba.datax.core.Engine, -mode, standalone, -job, E:\\git-project\\datax-home\\job\\mysql2mysql.json); Test public void test() { // 先生成任务日志 Long jobId 1L; LocalDateTime now LocalDateTime.now(); int execStatus -1; String msg ; QslJobLog qslJobLog new QslJobLog(jobId, execStatus, 0L, now, now); qslJobLogDao.insert(qslJobLog); try { ProcessBuilder pb new ProcessBuilder(DATAX_COMMNDS); // 合并错误流到标准输出 pb.redirectErrorStream(true); Process process pb.start(); // 线程池线程异步读取标准流 FutureString streamFuture readStream(process, qslJobLog.getId()); msg streamFuture.get(); // 等待命令执行完成 int i process.waitFor(); if (i 0) { execStatus 1; LOGGER.info(job[{}]执行完成, jobId); } else { LOGGER.error(job[{}]执行失败, jobId); execStatus 0; } } catch (Exception e) { execStatus 0; msg 任务执行异常 e.getMessage(); LOGGER.error(任务执行异常, e); } now LocalDateTime.now(); qslJobLogDao.update(new LambdaUpdateWrapperQslJobLog() .eq(QslJobLog::getId, qslJobLog.getId()) .set(QslJobLog::getExecStatus, execStatus) .set(QslJobLog::getUpdateTime, now) .set(QslJobLog::getRemark, msg)); } private FutureString readStream(Process process, Long jobLogId) { return executorService.submit(() - { String threadName Thread.currentThread().getName(); LOGGER.info(线程[{}]读取任务日志开始, threadName); StringBuilder sb new StringBuilder(); try (BufferedReader reader new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line reader.readLine()) ! null) { if (line.contains(sync rows)) { long syncRows Long.parseLong(line.split()[1]); qslJobLogDao.update(new LambdaUpdateWrapperQslJobLog() .eq(QslJobLog::getId, jobLogId) .setSql(sync_rows sync_rows syncRows)); } else { sb.append(line).append(StrPool.CRLF); } LOGGER.info(line); } } catch (IOException e) { LOGGER.error(日志读取异常, e); } LOGGER.info(线程[{}]读取任务日志结束, threadName); // 保留后面20000字符 return sb.length() 20000 ? sb.substring(sb.length() - 20000, sb.length()) : sb.toString(); }); }其中表 tbl_qsl_job_log 结构如下CREATE TABLE tbl_qsl_job_log ( id bigint NOT NULL COMMENT 主键id, job_id bigint NOT NULL COMMENT 任务id, sync_rows bigint NOT NULL DEFAULT 0 COMMENT 同步数量, exec_status tinyint DEFAULT NULL COMMENT 执行-状态-2:等待中-1:执行中0:失败1:成功, remark text COMMENT datax执行日志, create_time datetime DEFAULT NULL COMMENT 创建时间, update_time datetime DEFAULT NULL COMMENT 最终修改时间, PRIMARY KEY (id) )对readStream方法进行一下补充说明其中StringBuilder sb记录的是 DataX 的日志输出不包括包含sync rows的行并且截取最后 20000 个字符进行落库目的是方便从平台查看 DataX 的执行日志针对如上代码你们觉得有哪些优化空间下面是我做的一些优化调整删除 DataX 日志落库逻辑直接对接 DataX 任务日志文件DataX 日志不落库的话对readStream进行调整private FutureLong readStream(Process process, Long jobLogId) { return executorService.submit(() - { String threadName Thread.currentThread().getName(); long totalRows 0; LOGGER.info(线程[{}]读取任务日志开始jobLogId{}, threadName, jobLogId); try (BufferedReader reader new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) { String line; while ((line reader.readLine()) ! null) { if (line.contains(sync rows)) { long syncRows Long.parseLong(line.split()[1]); totalRows syncRows; qslJobLogDao.update(new LambdaUpdateWrapperQslJobLog() .eq(QslJobLog::getId, jobLogId) .setSql(sync_rows sync_rows syncRows)); } LOGGER.info(line); } } catch (IOException e) { LOGGER.error(日志读取异常, e); } LOGGER.info(线程[{}]读取任务日志结束jobLogId{}, threadName, jobLogId); return totalRows; }); }既然对接 DataX 日志文件那么 DataX 日志文件的重要性就上来了自然对其结构管理要更规范一些了对 DataX 的logbook.xml进行调整?xml version1.0 encodingUTF-8? configuration property namelog.dir value${datax.home}/log/ / property namejob.id value${job.id} / property namejob.log.id value${job.log.id} / property nameymd value${current.day}/ property namebyMillionSecond value${current.time.millis}/ appender nameSTDOUT classch.qos.logback.core.ConsoleAppender EncodingUTF-8/Encoding encoder classch.qos.logback.classic.encoder.PatternLayoutEncoder pattern%msg%n/pattern /encoder /appender appender nameFILE classch.qos.logback.core.FileAppender charsetUTF-8/charset file${log.dir}/${ymd}/${job.id}/${job.log.id}-${byMillionSecond}.log/file encoder classch.qos.logback.classic.encoder.PatternLayoutEncoder pattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{0} - %msg%n/pattern /encoder /appender root level${loglevel:-INFO} appender-ref refSTDOUT / appender-ref refFILE / /root /configuration文件中涉及 5 个变量可以通过设置系统属性的方式传递给 DataX 的 logbackprivate static final String DATAX_HOME_PATH E:\\git-project\\datax-home; Test public void test() { // 先生成任务日志 Long jobId 2L; LocalDateTime now LocalDateTime.now(); int execStatus -1; long totalRows 0; QslJobLog qslJobLog new QslJobLog(jobId, execStatus, 0L, now, now); qslJobLogDao.insert(qslJobLog); String currentDay now.format(DateTimeFormatter.ofPattern(yyyy-MM-dd)); String currentTimeMillis now.format(DateTimeFormatter.ofPattern(HH_mm_ss.SSS)); ListString DATAX_COMMNDS Arrays.asList(java, -server, -Xms4g, -Xmx4g, -Dfile.encodingGBK, -Dlogback.statusListenerClassch.qos.logback.core.status.NopStatusListener, -Ddatax.home DATAX_HOME_PATH, -Dlogback.configurationFile DATAX_HOME_PATH \\conf\\logback.xml, -Djob.id jobId, -Djob.log.id qslJobLog.getId(), -Dcurrent.day currentDay, -Dcurrent.time.millis currentTimeMillis, -classpath, DATAX_HOME_PATH \\lib\\*, com.alibaba.datax.core.Engine, -mode, standalone, -job, DATAX_HOME_PATH \\job\\mysql2mysql.json); String jobLogPath DATAX_HOME_PATH \\log\\ currentDay \\ jobId \\ qslJobLog.getId() - currentTimeMillis .log; try { ProcessBuilder pb new ProcessBuilder(DATAX_COMMNDS); // 合并错误流到标准输出 pb.redirectErrorStream(true); Process process pb.start(); // 线程池线程异步读取标准流 FutureLong streamFuture readStream(process, qslJobLog.getId()); totalRows streamFuture.get(); // 等待命令执行完成 int i process.waitFor(); if (i 0) { execStatus 1; LOGGER.info(job[{}]执行完成totalRows{}, jobId, totalRows); } else { LOGGER.error(job[{}]执行失败, jobId); execStatus 0; } } catch (Exception e) { execStatus 0; LOGGER.error(任务执行异常, e); } now LocalDateTime.now(); qslJobLogDao.update(new LambdaUpdateWrapperQslJobLog() .eq(QslJobLog::getId, qslJobLog.getId()) .set(QslJobLog::getExecStatus, execStatus) .set(QslJobLog::getUpdateTime, now) .set(QslJobLog::getRemark, jobLogPath)); }datax.homeDataX 的 home 路径示例中路径E:\git-project\datax-homejob.id平台任务id示例中是 2job.log.id平台任务执行的日志id每次任务执行时通过 Mybatis Plus 的雪花算法生成current.day平台任务执行时的日期格式yyyy-MM-ddcurrent.time.millis平台任务执行时的时分秒以及毫秒格式HH_mm_ss.SSS示例代码中jobLogPath落库到了表 tbl_qsl_job_log 的remark字段这是不推荐的应该新增字段如datax_log_path来存储任务执行完成之后日志路径与文件名格式如下这个路径也在表 tbl_qsl_job_log 中进行了存储删除异步等待减少平台任务与 DataX 任务的结束时差我们细细斟酌下如下几行代码// 线程池线程异步读取标准流