组件-拉取篇
2022-12-14 14:21:50 0 举报
组件-拉取篇
作者其他创作
大纲/内容
reader\":--Mysqlreader:配置所需字段 ---配置表-配置间隔和链接urlwriter\":---hdfswriter--\"hadoopConfig\"配置集群信息--配置映射字段-路径可以配置传递参数: \"path\": \"/yjx/app/ods/ods_yjx_goods_full/dt=${dt}\
Interceptor
Job--拆分---task--5个一组taskGroup--启动Reade--Channel--Writer的线程--job监控--所有完成后退出
Channel缓冲
Gmond
head
HDfs
putList
行为域数据采集
Interceptor 拦载器可以根据业努的需求拦截指定的数据
mysql
exec:监听一个日志文件spooldir;监听一个文件夹taildir:监听一些文件,特点命名规范的
event
ChannelSelectors
ChannelProcessor
做大规模的数据迁徙 (关系型数据库-->HDFS)
事实表
Sink
执行流程
Sink2
Agent1
数据预处理
{ \"job\": { \"setting\": { \"speed\": { \"channel\
Datax采集
k-v表
实战--增量读取
全量
推送把批数据写入到临时缓冲区检查Channel容量Channel容量不够,数据回滚
body
Channel2
业务域数据采集
Flume
SinkProcessor
可靠数据落地完成的信息之后,才会将数据从通道中删除。
Sqoop
Source
DataX3.0
关系型数据库->Hdfs
Gmetad
字典表
启动数据页面数据曝光数据事件数据错误数据
展示页面在Gmetad节点上
解耦可以相互组合
一个数据单元,消息头和消息体组成
关系数据库
Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework
HDFS
1、为什么要使用flume级联采集,级联采集的好处是什么?span style=\"font-size: inherit;\
Hdfs->关系型数据库
Source: 数据收集组件。接受客户端的数据输入
行为域ODS开发
流程
doPut
Ganglia
Framework + plugin架构构建
reader\":--Mysqlreader:配置所需字段-配置每天的条件---配置表配置间隔和链接url\"where\": \" create_time>='${dt}' and create_time <'${end_dt}' \
实战配置
实战--全量读取
是对一个事物(实体)进行属性描述的表
实体表
Datax
#!/bin/bashstart_date=$(date -d'-1 day' +'%Y-%m-%d')if [ $1 ];thenstart_date=$1fi# sql=\"load data inpath 'hdfs://hdfsyjx/yjx/app/ods/ods_yjx_goods_collection_inc/dt=${start_date}' into tableods.ods_app_event_log partition(dt='${start_date}');\"sql=\"alter table ods.ods_yjx_goods_collection_inc add if not exists partition(dt='${start_date}');\"echo \"待执行的sql为:$sql\"hive -e \"$sql\"# 判断任务执行是否成功if [ $? -eq 0 ];thenecho \"Success-日志数据日期: $start_date ; 数据源目录:/yjx/ods/ods_yjx_goods_collection_inc/dt=${start_date};目标表:ods_yjx_goods_collection_inc\"exit 0elseecho \"Success-日志数据日期: $start_date ; 数据源目录:/yjx/ods/ods_yjx_goods_collection_inc/dt=${start_date} ; 目标表:ods_yjx_goods_collection_inc\"exit 1fi
事物
抽取原则
python bin/datax.py -p \"-Ddt=2021-01-1\" job/yjx_goods_full.json
HBase
Flume采集
已过时(不维护)
Framework
定义:一个开源集群监视项目,设计用于测量数以千计的节点。
Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
hive建表
一个守护进程,他运行在每一个需要监测的节点上
增量采集
对一件发生过的事情(事实)进行描述的表
定时调度
Agent:一个独立的Flume进程计划中包含 source channel sink是Flume最小的运行单位
Hive
#sink组,配置多个后需要配置处理器a1.sinks.k1.channel = c1a1.sinks.k1.type = avroa1.sinks.k1.hostname = node02a1.sinks.k1.port = 4646a1.sinks.k2.channel = c1a1.sinks.k2.type = avroa1.sinks.k2.hostname = node03a1.sinks.k2.port = 4646#处理器,后续sink需要重这里面获取a1.sinkgroups = p1a1.sinkgroups.p1.sinks = k1 k2a1.sinkgroups.p1.processor.type = failovera1.sinkgroups.p1.processor.priority.k1 = 5a1.sinkgroups.p1.processor.priority.k2 = 10a1.sinkgroups.p1.processor.maxpenalty = 10000
HDFSWriter
a1.sources.r1.channels = c1a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = g1a1.sources.r1.filegroups.g1 = /opt/data/logdata/app/event_log_*a1.sources.r1.batchSize = 5000#读拦截器a1.sources.r1.interceptors = i1#拦截器种类a1.sources.r1.interceptors.i1.type = com.yjx.flume.interceptor.EventTimeStampInterceptor$EventTimeStampInterceptorBuilder#拦截其名字a1.sources.r1.interceptors.i1.timestamp_fieldname = timeStamp#拦截器头a1.sources.r1.interceptors.i1.event_header = timestamp
source
数据CRUD
sh ods_yjx _goods_collection_inc.sh 2021-01-02
前端埋点
配置举例
日志服务器
takeList
行为域采集
将表与HIVE数据进行绑定操作
当配置多个sink后,需要配置Sink Proceaaor组,可以设置其优先率
特点
mySQlReader
Python bin/datax “-Ddt=2021-01-01 -Dend dt=2021-1-2\" job/yjx goods collection inc.json
优点
业务数据库
Ganglia-web
# bin/flume-ng agent -c conf -f myconf/file2avro_v2.conf -n a1#一个上游两个下游a1.sources = r1a1.sinks = k1 k2a1.channels = c1a1.sources.r1.channels = c1a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = g1a1.sources.r1.filegroups.g1 = /opt/data/logdata/app/event_log_*a1.sources.r1.batchSize = 5000#读拦截器a1.sources.r1.interceptors = i1#拦截器种类a1.sources.r1.interceptors.i1.type = com.yjx.flume.interceptor.EventTimeStampInterceptor$EventTimeStampInterceptorBuilder#拦截其名字a1.sources.r1.interceptors.i1.timestamp_fieldname = timeStamp#拦截器头a1.sources.r1.interceptors.i1.event_header = timestampa1.channels.c1.type = filea1.channels.c1.checkpointDir = /opt/data/flume-data/checkpoint/a1.channels.c1.dataDirs = /opt/data/flume-data/data/a1.channels.c1.transactionCapacity = 10000#sink组,配置多个后需要配置处理器a1.sinks.k1.channel = c1a1.sinks.k1.type = avroa1.sinks.k1.hostname = node02a1.sinks.k1.port = 4646a1.sinks.k2.channel = c1a1.sinks.k2.type = avroa1.sinks.k2.hostname = node03a1.sinks.k2.port = 4646#处理器,后续sink需要重这里面获取a1.sinkgroups = p1a1.sinkgroups.p1.sinks = k1 k2a1.sinkgroups.p1.processor.type = failovera1.sinkgroups.p1.processor.priority.k1 = 5a1.sinkgroups.p1.processor.priority.k2 = 10a1.sinkgroups.p1.processor.maxpenalty = 10000
1.DataX接受到一个Job之后,启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点2. DataXJob不同的源切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。3. 多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0200张表--->200个task--->计算4个TaskGroup--->每个TaskGroup负责50个task
Sink: 从Channel中读取并移除Event求Event输出到任意的位置
一个守护进程,他定期检查gmonds,运行在主监控节点
0 条评论
下一页