图解DataX执行流程
2020-02-11 11:25:34 6 举报
一张图搞定 DataX 源码流程
作者其他创作
大纲/内容
获取配置: job.preHandler.pluginType 配置类型如下:READER(\"reader\
初始化 Writer: jobWriter
获取 reader 配置文件
5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
使用 JarLoader 加载各种依赖 jar : /tools/datax/plugin/reader/mysqlreader
6.最后还要汇报一次
启动 containerjobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、 post以及destroy和statistics1、preHandle():job前置操作2、init():初始化reader和writer3、prepare():执行插件的prepare操作4、split():切分任务5、schedule():执行任务6、post():执行插件的post操作7、postHandle():job后置操作8、invokeHooks():调用hook9、输出统计结果
打印相关信息,并校验json文件的合法性VMInfo vmInfo = VMInfo.getVmInfo();ConfigurationValidate.doValidate(configuration);
taskReader.init();taskReader.prepare();taskReader.startRead(recordSender);taskReader.post();taskReader.destroy();
doStart()
创建任务收集器: JobPluginCollector
taskWriter.init();taskWriter.prepare();taskWriter.startWrite(recordReceiver);taskWriter.post();taskWriter.destroy();
TaskExecutor#doStart 启动writerThread 启动readerThread
调用程序入口: Engine.entry(args);
9、输出统计结果
1、preHandle():job前置操作
6、post():执行插件的post操作
初始化1. 设置配置文件 configuration2. 初始化监控3. 设置 jobId4. 设置taskGroupId5. 设置 channel 实例 com.alibaba.datax.core.transport.channel.memory.MemoryChannel6. 设置架空com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector
7、postHandle():job后置操作
1. 状态check时间间隔,较短,可以把任务及时分发到对应channel中2. 状态汇报时间间隔,稍长,避免大量汇报3. 2分钟汇报一次性能统计4. 获取任务配置 List<Configuration> taskConfigs5. 构建待运行的任务队列 List<Configuration> taskQueue = buildRemainTasks(taskConfigs);6. 开始执行任务 [while (true) 死循环]
缓存数据存放到队列中MemoryChannel # ArrayBlockingQueue<Record>
4、split():切分任务[将一个查询 sql根据 splitPk 范围切分为多个任务] 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起, 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉 4.1、计算限速和并发,即实际的channel数和每个channel的限速,主要在adjustChannelNumber()中,这里不做过多说明 4.2、根据实际的channel数,切分reader端,具体的切分逻辑reader插件可以自行实现 4.3、根据reader端切分的数目切分writer端,达到reader:writer=1:1,这样每个task中都包含一个reader和一个writer
while(true)
启动engine.start(configuration);Engine engine = new Engine();
3、prepare():执行插件的prepare操作
1. 更新线程名字2. 创建: DefaultJobPluginCollector3. 初始化Reader4. 初始化 Writer
启动 containercom.alibaba.datax.core.job.JobContainer#start
分别执行reader和writer插件Job中的prepare函数即可,同样,每次执行前都会先加载对应的classLoader用于隔离
拉取 Record
启用: perfReportEnable 默认值 true注: standlone模式的datax shell任务不进行汇报
start 方法1、初始化task执行相关的状态信息,分别是taskId->Congifuration的map、 待运行的任务队列taskQueue、 运行失败任务taskFailedExecutorMap、 运行中的任务runTasks、 任务开始时间taskStartTimeMap2、循环检测所有任务的执行状态 1)判断是否有失败的task,如果有则放入失败对立中,并查看当前的执行是否支持重跑和failOver, 如果支持则重新放回执行队列中; 如果没有失败,则标记任务执行成功,并从状态轮询map中移除 2)如果发现有失败的任务,则汇报当前TaskGroup的状态,并抛出异常 3)查看当前执行队列的长度,如果发现执行队列还有通道,则构建TaskExecutor加入执行队列,并从待运行移除 4)检查执行队列和所有的任务状态,如果所有的任务都执行成功,则汇报taskGroup的状态并从循环中退出 5)检查当前时间是否超过汇报时间检测,如果是,则汇报当前状态 6)当所有的执行完成从while中退出之后,再次全局汇报当前的任务状态
读取用户配置的json文件,转化为内部的configuration配置
PerfTrace 创建&初始化
存入Record
初始化 Reader: initJobReader
2、init():初始化reader和writer
初始化PluginLoader,设置pluginConfigs,方便后面插件来获取
根据插件类型和插件的名称获取jar 加载器 JarLoader
8、invokeHooks():调用hook
创建 Container1.设置运行模式: standalone ? 2.创建JobContainer3.返回job 实例 ID: core.container.job.id = -1 ?????
调用程序入口: com.alibaba.datax.core.Engine入参: -mode standalone -jobid -1 -job /a/tmp/datax/datax.conf
python datax.py /a/tmp/datax/datax.conf
使用 JarLoader 加载各种依赖 jar : /tools/datax/plugin/reader/mysqlwriter
TaskGroupContainer#start
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tools/datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tools/datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=/tools/datax-Dlogback.configurationFile=/tools/datax/conf/logback.xml -classpath /tools/datax/lib/*:. -Dlog.file.name=tmp_datax_datax_conf com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job /a/tmp/datax/datax.conf
加载配置参数解析了java命令行的三个参数,分别是job、jobid和mode,其中job是用户配置的json文件路径,jobid和mode是python文件带进来的ConfigParser会解析Job、Plugin、Core配置文件全部信息
new TaskExecutor()创建 new WriterRunner(taskPlugin) 设置其任务信息 & 创建 writer 线程并设置类加载器创建 new ReaderRunner(taskPlugin) 设置其任务信息 & 创建 reader 线程并设置类加载器
获取 writer 配置文件
获取 job 信息Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
5、schedule():执行任务 5.1、计算taskGroup个数5.2、将切分的task分配到taskGroup中5.3、启动线程池执行taskGroup
收藏
收藏
0 条评论
下一页