ETL日志表
2022-12-14 14:20:11 0 举报
ETL日志表
作者其他创作
大纲/内容
表描述
文件数据解析
通过Spark程序解析Json文件再写入Hive表中 HDFS(json)--->spark--->Hive
spark及可以写sql也可以写代码
账号设备积分表
数据规范处理Boolean--Y/nString---空-nullDate--yyyy-MM-dd小数—> decimal字符串-string时间戳--bigint
解决地理位置进行维度统计
Spark解析
insert into table dwd.dwd_app_event_detail partition(dt='$start_date1')查询导入
清洗过滤:1.废弃字段(不相干字段)2. 格式不正确的(脏数据)3.缺少关键字段(deviceid/properties/eventid/sessionid )4. 不符合时间段5. 过滤爬虫请求
功能
经纬度ip映射
DWD
val splitedseesion = logbean.rdd.groupBy(bean => bean.sessionId) .flatMap(tp => { val list = tp._2 //分组后的一组会话id相同的值 .toList .sortBy(bean => bean.timeStamp) //根据数据搓排 val uuid = UUID.randomUUID().toString for (i <- 0 until list.size) { list(i).splitedSessionId = uuid if (i < list.size - 1 && list(i + 1).timeStamp - list(i).timeStamp > 30 * 60 * 1000) //时间戳相减大约30分钟则进行分组 list(i).splitedSessionId = UUID.randomUUID().toString } list }).toDS()
ID_MAPPING:全局唯一id
//广播ip字段// 读取ip2reion文件 val fs = FileSystem.get(new Configuration()) val path = new Path(\"/yjx/dict/ip2region.db\
过滤掉日志中不符合时间段的记录
O=R++:升级二进制键/值对组成的 flat 文件 ORC 大小最多减少 75%,数据处理速度大大提高。ORC:hive兼容性好;(二进制文件小,后续还可压缩Snappy)P:兼容性,多平台
过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)
建表
通过Hive兼容的Json解析器直接将Json数据解析到一张表中。(Hive3.X之后提供原生的JsonSerDe)
SESSION分割web端日志,按天session分割app日志,30分钟自动分割wx小程序日志:30min
SESSION分割(30min)
ETL
数据规范处理
解决:有些数据缺少关键信息不能用来计算
main val jedis1 = new Jedis(\"node02\
ODS
解决:将不同数据源采集的数据用相同规则不同
数据集成(地理位置映射)GPS/IP-省市区使用字典表
Redis
T 数据清洗: ( 有不完整的数据、错误的数据、重复的数据 )T 转换: 主要进行不一致的数据转换、数据粒度的转换,以及一些商务规则的计算。
设置默认用户
转换成对象
L 加载将数据加载到DW层
val curLog = spark.read.table(\"ods.ods_app_event_log\") .where(s\"dt='$start_date1'\") import org.apache.spark.sql.functions._ val isNotBlink=udf((s:String)=>{StringUtils.isNotBlank(s)}) import spark.implicits._ val format = new SimpleDateFormat(\"yyyy-MM-dd HH:mm:ss\")// 根据指定格式转成类型 val startTime = format.parse(s\"$start_date1 00:00:00\").getTime val endTime = format.parse(s\"$end_date 00:00:00\").getTime val filtered = curLog .where(isNotBlink($\"deviceid\") and 'properties.isNotNull and isNotBlink('deviceid) and isNotBlink('sessionid)) .where( s\"\"\"timestamp >= $startTime and timestamp < $endTime |\"\"\".stripMargin)
val standard = spark.sql( \"\"\
val curDayLog = spark.read.table(\"ods.ods_app_event_log\") .where(\"dt='2021-01-01' and eventid is not null\
val anonymousFilled = spark.sql( s\"\"\
经纬度知识库
Bzip 压缩70% ,可切分Snappy 压缩不能被切分,压缩率高,压缩解压缩快Lzo lz4可切片
对应操作
//广播经纬度映射表 val areaDict = spark.read.table(\"dim.dim_area_dict\") .where(\"geohash is not null and geohash !=''\
ip知识库
GEOHASH编码
getJsonObject
保存结果输出为parquet格式,压缩编码用snappyParquet.compress=snappy
新老访客标记
可能出现情况
分类
入仓方案
清洗过滤:
将整个Json看作一个字段先存入Hive表中,再通过Hive自带的函数(getJsonObject)解析再写入另一张表
保存结果
行为域DWD开发
解决;文件的保存
解决:间隔时间太长影响数据分析,例如:挂机(同一个session会话30min进行分隔)
对文件类型的解析
#!/bin/bash0,不能有空格 1,可以使用赋值--配合$变量使用2,单引号和双引号没有区别3,$(link语句)可以得到执行语句后的结果4,if语句if[ $1 ];thenStart_date=$1Fi --->如果传参数了,将传参赋值给 Start_date5,$?获取上一个指令的执行结果
1.清洗过滤2.Session分割3.数据规范处理4.地理位置信息映射表5.数据的集成6.设备用户关系绑定表7.ID_MAPPING8.新老访客标记
数据解析将Json打平,解析成扁平格式(Hive兼容的Json解析器JsonSerDe)
实际操作
SparkETL
行为域ODS开发
JsonSerDe
base32
主要作用:直接映射操作数据(原始数据),数据备份;建模方法; 数据结构完全一致(只做拉取,行为数据有简单的操作,时间戳)存储周期:定期(半年或三个月)进行资源释放压缩
linux脚本
1.数据解析2.结果保存(parquet)
新老访客标记: 可以知道金额为零到底是新用户还是老用户新-1老-0
0 条评论
下一页