离线项目重点总结
2023-03-27 15:21:54 0 举报
AI智能生成
对本人实际工作中的电商离线项目总结,需求和实现详细,有需要的拿走
作者其他创作
大纲/内容
连续活跃区间表的实现思路
活跃行为
如登录一次,访问时长超过10分钟、消费1次等等,之后用户每次做出活跃行为,即记录为:活跃1次
核心思想
记录着每个人的每天活跃状态,但是又不用每天都存储一条记录
分析举例
最近一个月内,有过连续活跃10+天的人
最近一个月内,每个用户的平均活跃天数
最近一个月内,连续活跃[1-10)天的人数,[10-20)天的人数,[20+天的人数
任意指定的一段日期范围内,连续活跃5+天的人
最近30天内,沉默天数超过3的有多少人,超过5天的有多少人
最近一个月内,每个用户的平均活跃天数
最近一个月内,连续活跃[1-10)天的人数,[10-20)天的人数,[20+天的人数
任意指定的一段日期范围内,连续活跃5+天的人
最近30天内,沉默天数超过3的有多少人,超过5天的有多少人
实现思路
方案一
普通实现
1、在dws层创建连续活跃区间记录表以及日活表。
2、在连续活跃区间记录表中过滤出所有不带结束日期end_dt的区间记录
3、接着再过滤出所有带结束日期end_dt的记录,并且将过滤出来的记录与当天日活表进行full join
4、只有在过滤出结束日记的区间记录表中有记录,且日活表中没有记录的数据(这里的记录指guid,是用户在当天是否有活跃),结束日期的区间end=current-1day。其余的都取结束日期。
5、最后,将过滤出的不带结束日期的区间记录表与步骤四做好的表进行union all即可。
2、在连续活跃区间记录表中过滤出所有不带结束日期end_dt的区间记录
3、接着再过滤出所有带结束日期end_dt的记录,并且将过滤出来的记录与当天日活表进行full join
4、只有在过滤出结束日记的区间记录表中有记录,且日活表中没有记录的数据(这里的记录指guid,是用户在当天是否有活跃),结束日期的区间end=current-1day。其余的都取结束日期。
5、最后,将过滤出的不带结束日期的区间记录表与步骤四做好的表进行union all即可。
方案二
bitmap实现
核心:利用一个bitmap来记录一个人在最近30天内的活跃情况
在全局的层面上,为所有用户维护一张“维度信息表”(GUID、省、市等信息)
在全局的层面上,为所有用户维护一张“维度信息表”(GUID、省、市等信息)
1、求指定日期范围内连续活跃的人:指定日期范围内的bit位全为1
2、求指定日期范围内每个人的活跃天数:把活跃状态值转成二进制的字符串,然后把0去掉,求去0后的字符串长度(等价于1的个数)
3、求最近一个月内,每个人的最大连续活跃天数:将状态二进制字符串按照0的组合分割成多个1的字符串,然后炸裂,然后按照用户分组,取“1”字符串的最大长度
2、求指定日期范围内每个人的活跃天数:把活跃状态值转成二进制的字符串,然后把0去掉,求去0后的字符串长度(等价于1的个数)
3、求最近一个月内,每个人的最大连续活跃天数:将状态二进制字符串按照0的组合分割成多个1的字符串,然后炸裂,然后按照用户分组,取“1”字符串的最大长度
为什么要进行级联采集
概述
所谓级联,是指一个flume的sink作为另一个flume的source,常用于收集其它服务器的日志到日志服务器
详解
若由一层的flume去进行采集并将文件放入到HDFS中对flume节点的性能是个极大的挑战,所以分为两层进行数据采集任务的执行,即上下游两层,上游flume为将nginx产生的日志进行采集并将数据传输给下游flume,下游的flume将收到的日志文件传输给HDFS
拦截器的编写思路是什么
flume中拦截器的作用
1、时间拦截器:将时间戳插入到flume的事件报表头中
2、主机名或IP拦截器:在header里加入IP或者主机名
3、静态拦截器:将k-v插入到事件的报表头中
4、正则拦截器:通过正则表达式将一些不需要的日志过滤掉,也可以收集只满足正则条件的日志
5、自定义拦截器:根据实际业务的需求,更好的满足数据在应用层的处理,通过自定义flume拦截器,过滤掉不需要的字段,并对指定的字段进行加密处理,将源数据进行预处理,减少了数据的传输量,降低了存储的开销
2、主机名或IP拦截器:在header里加入IP或者主机名
3、静态拦截器:将k-v插入到事件的报表头中
4、正则拦截器:通过正则表达式将一些不需要的日志过滤掉,也可以收集只满足正则条件的日志
5、自定义拦截器:根据实际业务的需求,更好的满足数据在应用层的处理,通过自定义flume拦截器,过滤掉不需要的字段,并对指定的字段进行加密处理,将源数据进行预处理,减少了数据的传输量,降低了存储的开销
实现思路
首先,在conf配置文件中,在上游source处添加interceptor,选择拦截器的type(种类)为自定义拦截器,设置两个自定义变量
type的值为自己写的一个类与类的调度类,类中的具体操作,分别获取event的body与header,body即为数据本身,再通过body获取数据中的timstamp字段的具体值,将值当作value,timestamp字符串作为key插入到header中,对于每个event都进行一次上述操作即可完成flume拦截器的开发。
type的值为自己写的一个类与类的调度类,类中的具体操作,分别获取event的body与header,body即为数据本身,再通过body获取数据中的timstamp字段的具体值,将值当作value,timestamp字符串作为key插入到header中,对于每个event都进行一次上述操作即可完成flume拦截器的开发。
数据零点漂移问题
什么是零点漂移
对于日志,flume、sink写入HDFS时,如果按照时间生成文件,在没有明确指定时间的情况下,会读取服务器时间作为创建文件的依据,这会导致日志的实际生成日期与文件不符
即在按天生成日志文件的情况下,一条23:59:59左右生成的日志发送到服务器后可能已经是第二天了,如果没有指定时间,会被写入第二天对应的文件中,这就是所谓的零点漂移
即在按天生成日志文件的情况下,一条23:59:59左右生成的日志发送到服务器后可能已经是第二天了,如果没有指定时间,会被写入第二天对应的文件中,这就是所谓的零点漂移
解决
要解决零点漂移问题,通常是将日志中记录的日志创建时间提取出来,写入flume事件头的timestamp字段,有了这个字段,flume创建文件时,会依据这个字段创建文件,这种场景很类似spqrk、flink的时间和处理事件。
Datax如何配置HDFS的HA
说明
1、单个file支持多线程并发读取,这里涉及到单个File内部切分算法。
2、目前还不支持hdfs HA
2、目前还不支持hdfs HA
实现
1、通过shell脚本动态传参传入对应的三个参数
nameService为cdh配置高可用时设置的nameService1,myFS和myFSBac为对应namenode节点的8020端口服务
2、将hdfs-site.xml core-site.xml hive-site.xml 三个文件放到hdfsWriter.jar文件中去。
因为在hdfs-site.xml中已经指明了dfs.nameservice=nameservice1及其他高可用的配置。
nameService为cdh配置高可用时设置的nameService1,myFS和myFSBac为对应namenode节点的8020端口服务
2、将hdfs-site.xml core-site.xml hive-site.xml 三个文件放到hdfsWriter.jar文件中去。
因为在hdfs-site.xml中已经指明了dfs.nameservice=nameservice1及其他高可用的配置。
Datax可以做并发数据同步嘛
Datax工作流程
一个DataX Job会切分成多个Task,每个Task会按TaskGroup进行分组,一个Task内部会有一组Reader->Channel->Writer。Channel是连接Reader和Writer的数据交换通道,所有的数据都会经由Channel进行传输
并发实现
提升Datax Job内Channel并发数,并发数=taskGroup的数量。每一个TaskGroup并发执行的Task数(单个任务组的并发数量为5)
1、配置全局Byte限速以及单Channel Byte限速,channel个数=全局Byte限速/单channel byte限速
2、配置全局record限速以及单channel record限速,channel个数=全局record限速/单channel record限速
3、直接配置channel个数
配置含义:
job.setting.speed.channel : channel并发数
job.setting.speed.record : 全局配置channel的record限速
job.setting.speed.byte:全局配置channel的byte限速
core.transport.channel.speed.record:单channel的record限速
core.transport.channel.speed.byte:单channel的byte限速
2、配置全局record限速以及单channel record限速,channel个数=全局record限速/单channel record限速
3、直接配置channel个数
配置含义:
job.setting.speed.channel : channel并发数
job.setting.speed.record : 全局配置channel的record限速
job.setting.speed.byte:全局配置channel的byte限速
core.transport.channel.speed.record:单channel的record限速
core.transport.channel.speed.byte:单channel的byte限速
注意事项
当提升DataX Job内Channel并发数时,调整JVM heap参数,原因如下:
- 当一个Job内Channel数变多后,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。
- 例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止jvm报内存溢出等错误,调大jvm的堆参数。
- 通常我们建议将内存设置为4G或者8G,这个也可以根据实际情况来调整
- 调整JVM xms xmx参数的两种方式:一种是直接更改datax.py;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
Channel个数并不是越多越好, 原因如下:
- Channel个数的增加,带来的是更多的CPU消耗以及内存消耗。
- 如果Channel并发配置过高导致JVM内存不够用,会出现的情况是发生频繁的Full GC,导出速度会骤降,适得其反
- 当一个Job内Channel数变多后,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。
- 例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止jvm报内存溢出等错误,调大jvm的堆参数。
- 通常我们建议将内存设置为4G或者8G,这个也可以根据实际情况来调整
- 调整JVM xms xmx参数的两种方式:一种是直接更改datax.py;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
Channel个数并不是越多越好, 原因如下:
- Channel个数的增加,带来的是更多的CPU消耗以及内存消耗。
- 如果Channel并发配置过高导致JVM内存不够用,会出现的情况是发生频繁的Full GC,导出速度会骤降,适得其反
Json数据入仓方案及选择
1、spark解析:通过spark程序解析json文件再写入hive表中
2、getJsonObject():将整个Json看作一个字段先存入Hive表中,再通过Hive自带的函数解析再写入另一张表
3、JsonSerDe:通过hive兼容的json解析器直接将json数据解析到一张表中。(hive3.x之后提供原生的jsonSerDe)
2、getJsonObject():将整个Json看作一个字段先存入Hive表中,再通过Hive自带的函数解析再写入另一张表
3、JsonSerDe:通过hive兼容的json解析器直接将json数据解析到一张表中。(hive3.x之后提供原生的jsonSerDe)
SparkETL过程及实现
活跃、留存、流失、回流分别代表什么意思
活跃
如登录一次、访问时长超过10分组、消费一次等等,之后用户每次做出活跃行为,即记录为:活跃一次
留存
用户从指定时间开始,经历一段时间以后仍然有活跃行为,则记为一次留存。最常见的是新用户留存
流失
人为定义一个时间点为流失节点,比如用户12个月未登录之类。达到节点的,即为流失用户
回流
即先认定这部分用户已经流失,根据实际业务需求来定义,这部分流失用户在当前有登录,就认为这部分用户在当前时间回流
用户回流可以分为自主回流与人工回流,自主回流指玩家自己回流了,而人工回流就是人为导致的,这里的回流用户大部分为自主回流。
用户回流可以分为自主回流与人工回流,自主回流指玩家自己回流了,而人工回流就是人为导致的,这里的回流用户大部分为自主回流。
IP转地理位置的信息思路是什么
1、将dim(公共维度层)的进行广播——读取dim_area_dict表中geohash为绝对非空的值(绝对非空为geohash is not null and geohash !=''),再将非空数据转为rdd进行映射,映射为由geohash与对应省市区组成的元组,将这些元组进行广播即可
2、将ip2region的信息进行广播——读取hdfs中的ip2region.db文件,并通过IOUtils将文件中的内容转为字节数组,然后将这个字节数组进行广播
3、首先进行分区映射即mapPartitions,将广播的变量读取到分区中并且将其实例化,因为我们获得了广播出来的信息后需要将其实例化并对各个数据进行比对,如果使用map的话,由于map是对每条数据进行映射,而这样子做的话便会每有一条数据就对广播数据进行一次实例化,内存是吃不消的,而且对资源也是一种浪费,所以使用分区映射,即一个分区中的这些数据共享广播出来的数据,这样的话一个分区只要进行一次广播数据的实例化即可,节省了很多资源
4、读取到广播数据并将其实例化后,将迭代器(因为使用的是mapPartitions,所以操作的是保存了整个分区数据的迭代器)中的数据进行映射,因为web端的数据没有经纬度信息,所以web端的数据需要通过ip来实现地理位置集成,所以在映射中,首先需要判断是通过ip地址进行地理位置集成还是通过经纬度信息进行集成,首先默认为通过经纬度信息进行集成
5、获取迭代器中元素的经度维度,再将经纬度通过GeoHash获得对应位置的geostr即geo字符串,将geostr与广播的数据进行对比,若广播的数据包含该geostr,则将广播的数据中对应的省市区赋予该条映射数据;如果广播的数据不包含该geostr,则使用ip地址进行地理位置的集成
6、将映射数据的ip地址当作参数获得对应的字符串结果,将字符串结果切割为数组,通过数组获取省市的信息,便完成了地理位置的集成
2、将ip2region的信息进行广播——读取hdfs中的ip2region.db文件,并通过IOUtils将文件中的内容转为字节数组,然后将这个字节数组进行广播
3、首先进行分区映射即mapPartitions,将广播的变量读取到分区中并且将其实例化,因为我们获得了广播出来的信息后需要将其实例化并对各个数据进行比对,如果使用map的话,由于map是对每条数据进行映射,而这样子做的话便会每有一条数据就对广播数据进行一次实例化,内存是吃不消的,而且对资源也是一种浪费,所以使用分区映射,即一个分区中的这些数据共享广播出来的数据,这样的话一个分区只要进行一次广播数据的实例化即可,节省了很多资源
4、读取到广播数据并将其实例化后,将迭代器(因为使用的是mapPartitions,所以操作的是保存了整个分区数据的迭代器)中的数据进行映射,因为web端的数据没有经纬度信息,所以web端的数据需要通过ip来实现地理位置集成,所以在映射中,首先需要判断是通过ip地址进行地理位置集成还是通过经纬度信息进行集成,首先默认为通过经纬度信息进行集成
5、获取迭代器中元素的经度维度,再将经纬度通过GeoHash获得对应位置的geostr即geo字符串,将geostr与广播的数据进行对比,若广播的数据包含该geostr,则将广播的数据中对应的省市区赋予该条映射数据;如果广播的数据不包含该geostr,则使用ip地址进行地理位置的集成
6、将映射数据的ip地址当作参数获得对应的字符串结果,将字符串结果切割为数组,通过数组获取省市的信息,便完成了地理位置的集成
什么是漏斗分析模型
漏斗模型,是用来分析业务转化率的
分析师(需求方)定义的一种业务路径,用户沿着这个路径上的各个步骤,不断走向业务目标;路径上的各个步骤,人数通常是会逐步递减,形如一个漏斗;把这种分析形象地称呼为:漏斗分析模型
分析师(需求方)定义的一种业务路径,用户沿着这个路径上的各个步骤,不断走向业务目标;路径上的各个步骤,人数通常是会逐步递减,形如一个漏斗;把这种分析形象地称呼为:漏斗分析模型
什么是事件归因分析
归因分析,是一件非常复杂的运算:一个业务目标事件的达成,究竟是由哪些原因引起的,待归因事件通常是事先会定好候选项!
比如,一个业务目标x,能够由A,C,F事件引起;当一个用户完成了目标X,那么我们要分析他的x究竟是由ACF中哪一个事件所引起。
有如下计算策略(模型):
1、首次触点归因:待归因事件中,最早发生的事,被认为是导致业务结果的唯一因素
2、末次触点归因:待归因事件中,最近发生的事,被认为事导致业务结果的唯一因素
3、线性归因:待归因事件中,每一个事件都被认为对业务结果产生了影响,影响力平均分摊
4、位置归因:定义一个规则,比如最早、最晚事件占40%影响力,中间事件平摊影响力
5、时间衰减归因:越晚发生的待归因事件,对业务的影响力越大。
比如,一个业务目标x,能够由A,C,F事件引起;当一个用户完成了目标X,那么我们要分析他的x究竟是由ACF中哪一个事件所引起。
有如下计算策略(模型):
1、首次触点归因:待归因事件中,最早发生的事,被认为是导致业务结果的唯一因素
2、末次触点归因:待归因事件中,最近发生的事,被认为事导致业务结果的唯一因素
3、线性归因:待归因事件中,每一个事件都被认为对业务结果产生了影响,影响力平均分摊
4、位置归因:定义一个规则,比如最早、最晚事件占40%影响力,中间事件平摊影响力
5、时间衰减归因:越晚发生的待归因事件,对业务的影响力越大。
行转列、列转行
思路
数据重复问题
方式
行转列:explode()、split()、LATERAL VIEW
列转行:collect_set()、collect_list()、concat_ws()
列转行:collect_set()、collect_list()、concat_ws()
生产过程中碰到过什么问题
从0到1的数仓搭建
1、自下而上,从应用入手。根据应用层面一步一步搭建
2、根据业务,确定维度,进行数仓搭建
2、根据业务,确定维度,进行数仓搭建
维度建模
星型模型
星型模型与雪花模型的区别主要在于维度的层级,标准的星型模型维度只有一层,而雪花模型可能回设计较多层
雪花模型
比较靠近3NF,但是无法完全遵守,因为遵循3NF的性能成本太高。
星座模型
星座模型与需求有关,和设计无关
数仓分层
0 条评论
下一页