大数据数仓离线项目思维导图
2023-02-23 21:52:37 2 举报
AI智能生成
详解介绍了离线数仓设计中的SparkEtl流程和数仓分层详解。对于初学大数据的人来说,大有裨益。
作者其他创作
大纲/内容
需求分析与设计
数据仓库
数据仓库是一个各种数据(包括历史数据和当前数据)的中央存储系统,
应当提供数据的存储、管理和分析功能。
数据仓库能为数据挖掘、多维分析、决策支持、报表等系统和应用提供一致的、准确的、易用的数据。
应当提供数据的存储、管理和分析功能。
数据仓库能为数据挖掘、多维分析、决策支持、报表等系统和应用提供一致的、准确的、易用的数据。
数据建模
ER建模
用实体加关系描述的数据模型描述企业业务架构,
站在企业角度面向主题的抽象,而不是针对某个具体业务流程的实体对象关系抽象。
优点:规范性较好,冗余小,数据集成和数据一致性方面得到重视。
缺点:需要全面了解企业业务、数据和关系;实施周期非常长,成本昂贵;对建模人员的能力要求比较高。
站在企业角度面向主题的抽象,而不是针对某个具体业务流程的实体对象关系抽象。
优点:规范性较好,冗余小,数据集成和数据一致性方面得到重视。
缺点:需要全面了解企业业务、数据和关系;实施周期非常长,成本昂贵;对建模人员的能力要求比较高。
维度建模
事实表
发生在现实世界中的操作型事件,其所产生的可度量数值,存储在事实表中。
从最低的粒度级别来看,事实表行应对应一个度量时间,反之亦然。
从最低的粒度级别来看,事实表行应对应一个度量时间,反之亦然。
维度表
每个维度表都包含单一的主键列。维度表的主键可以作为与之关联的任何事实表的外键,
当然,维度表的描述环境与事实表行完全对应。
维度表通常比较宽,是扁平型非规范表,包含大量的低粒度的文本属性。
当然,维度表的描述环境与事实表行完全对应。
维度表通常比较宽,是扁平型非规范表,包含大量的低粒度的文本属性。
优缺点
优点:技术要求不高,快速上手,敏捷迭代,快速交付;更快速完成分析需求,
较好的大规模复杂查询的响应性能。
缺点:维度表的冗余会较多,视野狭窄。
较好的大规模复杂查询的响应性能。
缺点:维度表的冗余会较多,视野狭窄。
技术架构
- 以HDFS作为最底层存储
以Hive作为数仓基础设施
以spark作为核心运算引擎
以Flume、Datax、Azkaban、等作为外围粘合辅助系统
以Kylin/Clickhouse作为OLAP(联机数据分析)分析引擎
数据仓库分层
分层原因:1.用空间换时间。通过大量的预处理来提升应用系统的用户体验,
因此数据仓库会存在大量的冗余数据。
2.增强扩展性。不分层的花,如果源业务系统的业务规则发生变化将会影响整个数据清洗过程,工作量巨大。
3.分层管理。通过数据分层管理可以简化数据强袭的过程,因为把原来的一步工作分到了多个步骤去完成,
相当于把一个复杂的工作拆成了多个简单的工作,把一个大的黑盒变成了一个白盒。容易理解与调整。
因此数据仓库会存在大量的冗余数据。
2.增强扩展性。不分层的花,如果源业务系统的业务规则发生变化将会影响整个数据清洗过程,工作量巨大。
3.分层管理。通过数据分层管理可以简化数据强袭的过程,因为把原来的一步工作分到了多个步骤去完成,
相当于把一个复杂的工作拆成了多个简单的工作,把一个大的黑盒变成了一个白盒。容易理解与调整。
数仓分层优点
1.清晰数据结构。每一个数据分层都有它的作用域,可以更方便定位和理解。
2.方便血缘追踪。如果有一张表的源表出问题了,我们希望能够快速准确地定位到问题并解决。
3.减少重复的开发。开发一些通用的中间层数据,能够极大的减少重复计算。
4.把复杂问题简单化。每一层只处理单一的步骤,比较简单易理解,便于维护数据准确性,
当数据出现问题只需要从有问题的步骤开始修复。
5.屏蔽原始数据的异常。屏蔽业务的影响,不必改一次业务就需要重新接入数据。
2.方便血缘追踪。如果有一张表的源表出问题了,我们希望能够快速准确地定位到问题并解决。
3.减少重复的开发。开发一些通用的中间层数据,能够极大的减少重复计算。
4.把复杂问题简单化。每一层只处理单一的步骤,比较简单易理解,便于维护数据准确性,
当数据出现问题只需要从有问题的步骤开始修复。
5.屏蔽原始数据的异常。屏蔽业务的影响,不必改一次业务就需要重新接入数据。
数仓分层明细
dws
数据服务层
保留更少的维度,比如只有人。计算出更多的用户相关的指标。提供更多时间维度的数据。
保留更少的维度,比如只有人。计算出更多的用户相关的指标。提供更多时间维度的数据。
dwd
数据明细层
对ods层数据做一定的清洗和主题汇总。保留通用的维度。计算出常用的统计指标。
对ods层数据做一定的清洗和主题汇总。保留通用的维度。计算出常用的统计指标。
ods
原始数据层
本层的数据,总体上大多数是按照源头业务系统的分类方式而分类的。
该层最接近数据源中数据的一层,数据源中的数据,经过抽取、洗净、传输,也就是经历etl后装入ods。
本层的数据,总体上大多数是按照源头业务系统的分类方式而分类的。
该层最接近数据源中数据的一层,数据源中的数据,经过抽取、洗净、传输,也就是经历etl后装入ods。
ads
数据服务层
该层主要是提供数据产品和数据分析使用的数据,一般会存放在es,mysql等系统中供线上系统使用,
也可能会存在hive或者druid中供数据分析和数据挖掘使用。
该层主要是提供数据产品和数据分析使用的数据,一般会存放在es,mysql等系统中供线上系统使用,
也可能会存在hive或者druid中供数据分析和数据挖掘使用。
宽表窄表
宽表
从字面意义上讲就是字段比较多的数据库表。
通常是指业务主题相关的指标、维度、属性关联在一起的一张数据库表。
由于把不同的内容都放在同一张表存储,宽表已经不符合三范式的模型设计规范,随之带来的主要坏处就是数据的大量冗余,
与之相对应的就是查询性能的提高与便捷。
通常是指业务主题相关的指标、维度、属性关联在一起的一张数据库表。
由于把不同的内容都放在同一张表存储,宽表已经不符合三范式的模型设计规范,随之带来的主要坏处就是数据的大量冗余,
与之相对应的就是查询性能的提高与便捷。
窄表
严格按照数据库设计三范式。尽量减少数据冗余,但是缺点是修改一个数据可能需要修改多张表。
方便扩展,能适应各种复杂的数据结构,无论有多少配置,都不用修改表结构。但代码逻辑可能需要包装一下。
方便扩展,能适应各种复杂的数据结构,无论有多少配置,都不用修改表结构。但代码逻辑可能需要包装一下。
数据采集与处理
Flume
什么是flume
Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,
用于手机数据;同时,flume提供对数据进行简单处理,并写到各种数据接受放的能力。
用于手机数据;同时,flume提供对数据进行简单处理,并写到各种数据接受放的能力。
flume核心
source
source是数据的收集端,负责将数据捕获后进行特殊的格式化,讲数据封装到事件里,
然后将数据推入channel中。
然后将数据推入channel中。
channel
channel是连接source和sink的组件,可以将它看作一个数据的缓冲区。它可以将事件暂存到内存中也可以持久化到本地磁盘上,
直到sink处理完该事件。
直到sink处理完该事件。
sink
sink从channel中取出事件,然后将事件发送到别处,可以向文件系统、数据库、Hadoop存数据,
也可以是其它的agent的source。在日志数据较少时,可以将数据存储在文件系统中,并设定一点过的时间间隔保存数据。
也可以是其它的agent的source。在日志数据较少时,可以将数据存储在文件系统中,并设定一点过的时间间隔保存数据。
flume优缺点
1.当收集数据的速度超过将写入数据的时候,也就是当收集信息达到峰值时,
flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据。
2.flume的管道是基于事务,保证了数据在传送和接收时的一致性。
3.flume时可靠的,容错性高的,可升级的,易管理的,并且可定制的。
4.除了日志信息,flume同时也可以用来接入收集大规模的社交网络节点事件数据。
5.支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等。
flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据。
2.flume的管道是基于事务,保证了数据在传送和接收时的一致性。
3.flume时可靠的,容错性高的,可升级的,易管理的,并且可定制的。
4.除了日志信息,flume同时也可以用来接入收集大规模的社交网络节点事件数据。
5.支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等。
flume代码示例
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.88.101:3306/test
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = 123456
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.88.101:3306/test
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = 123456
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
sqoop
什么sqoop
主要用于Hadoop和关系数据库、数据仓库、nosql系统间传递数据。
通过sqoop我们可以方便地将数据从关系数据库导入到hdfs、hbase、hive,或者将数据从hdfs导出到关系数据库
通过sqoop我们可以方便地将数据从关系数据库导入到hdfs、hbase、hive,或者将数据从hdfs导出到关系数据库
sqoop优势
1.从工作模式角度看:sqoop是基于客户端模式的,用户使用客户端模式,只需要在一台机器上即可完成。
2.从mapreduce角度看:sqoop只提交一个map作业,数据的传输和转换都是使用mapper来完成的,
而且该mapreduce作业仅有mapper并不需要提供reducer,在执行sqoop时可以通过yarn监控页面看到。
3.从安全角度看:需要在执行时将用户名或者密码显性指定,也可以在配置文件中配置,总的来说安全性不是很高。
2.从mapreduce角度看:sqoop只提交一个map作业,数据的传输和转换都是使用mapper来完成的,
而且该mapreduce作业仅有mapper并不需要提供reducer,在执行sqoop时可以通过yarn监控页面看到。
3.从安全角度看:需要在执行时将用户名或者密码显性指定,也可以在配置文件中配置,总的来说安全性不是很高。
sqoop代码示例
sqoop import \
--connect jdbc:mysql://node01:3306/scott \
--username root \
--password 123456 \
--table emp \
--target-dir /sqoop/empall \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t";
--connect jdbc:mysql://node01:3306/scott \
--username root \
--password 123456 \
--table emp \
--target-dir /sqoop/empall \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t";
DataX
什么是datax
datax是一个异构数据源离线同步工具,致力于实现包括关系型数据库、
hdfs、hive、odps、hbase、ftp等各种异构数据源之间稳定高效的数据同步功能。
hdfs、hive、odps、hbase、ftp等各种异构数据源之间稳定高效的数据同步功能。
datax的优势
1.完美解决数据传输个别类型失真的问题。datax3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,
让数据可以完整无损的传输到目的端。
2.提供作业全链路的流量。datax3.0运行过程可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以了解作业状态。
3.提供脏数据检测。在数据传输过程中,势必会有很多数据传输报错,这种数据被datax认为是脏数据。datax目前可以实现脏数据精确过滤、
识别、采集、展示,为用户提供多种的脏数据处理模式,让用户精确把控数据质量大关。
让数据可以完整无损的传输到目的端。
2.提供作业全链路的流量。datax3.0运行过程可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以了解作业状态。
3.提供脏数据检测。在数据传输过程中,势必会有很多数据传输报错,这种数据被datax认为是脏数据。datax目前可以实现脏数据精确过滤、
识别、采集、展示,为用户提供多种的脏数据处理模式,让用户精确把控数据质量大关。
datax使用举例
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个200张分表的mysql数据同
步到odps里面。 DataX的调度决策思路是:
1. DataXJob根据分库分表切分成了200个Task。
2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
3. 4个TaskGroup平分切分好的200个Task,每一个TaskGroup负责以5个并发共计运行50个Task。
步到odps里面。 DataX的调度决策思路是:
1. DataXJob根据分库分表切分成了200个Task。
2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
3. 4个TaskGroup平分切分好的200个Task,每一个TaskGroup负责以5个并发共计运行50个Task。
datax数据抽取策略
1. 如果是实体表,如果数据量比较小,一般每天或者一周,一个月抽取一份全量的表.
如果是实体大表,一般每天抽取一份增量数据,然后按照分区表进行存储.这类数据都是先放到
ODS层.如果有需要,在DWD层每天做所有数据的滚动聚合,这时候DWD就有一份完整总数居.
2. 事实表,如订单,购物车等表.一般都是每天抽取一份增量数据,全量导入没必要,增量导入,然后每
天的数据以分区形式存储.也是存放到ODS贴源层.
如果需要做滚动合并,一般是在DWD层做每日数据滚动合并,这样DWD就会有一份完整的总数
居大表.
3. 维度表,和实体表也是一样,需要区分大表还是小表,一般10万条或者100万条以下算小表,具体
标准每个公司差异可能很大.毕竟对于大数据处理来说,百万条数据才到入门数据门槛.
如果是实体大表,一般每天抽取一份增量数据,然后按照分区表进行存储.这类数据都是先放到
ODS层.如果有需要,在DWD层每天做所有数据的滚动聚合,这时候DWD就有一份完整总数居.
2. 事实表,如订单,购物车等表.一般都是每天抽取一份增量数据,全量导入没必要,增量导入,然后每
天的数据以分区形式存储.也是存放到ODS贴源层.
如果需要做滚动合并,一般是在DWD层做每日数据滚动合并,这样DWD就会有一份完整的总数
居大表.
3. 维度表,和实体表也是一样,需要区分大表还是小表,一般10万条或者100万条以下算小表,具体
标准每个公司差异可能很大.毕竟对于大数据处理来说,百万条数据才到入门数据门槛.
datax配置代码示例
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader", reader名字
"parameter": { 需要同步的列名集合,使用json数组描述自带
信息,*代表所有列
"column": [], 具体的列和value
"connection": [ 连接信息
{
"jdbcUrl": [],对数据库的JDBC连接信息,使用JSON数组描
述,支持多个连接地址
"table": [] 需要同步的表,支持多个
【"querySql":[]】 自定义SQL,配置它后,mysqlreader直接
忽略table、column、where
}
],
"password": "", 数据库用户名对应的密码
"username": "", 数据库用户名
"where": "", 筛选条件
【"splitPK":"" 】 数据分片字段,一般是主键,仅支持整型
}
},
"writer": {
"name": "hdfswriter", writer名
"parameter": {
"column": [], 写入数据的字段,其中name指定字段名,type
指定类型
"compress": "", hdfs文件压缩类型,默认不填写意味着没有压
缩
"defaultFS": "", hdfs文件系统的namenode节点地址,格式:
hdfs://ip:端口
"fieldDelimiter": "", 字段分隔符
"fileName": "", 写入的文件名
"fileType": "", 文件的类型,目前只支持用户配置位“text”或
者“orc”
"path": "", 存储到Hadoop hdfs文件系统的路劲信息
"writeMode": "" hdfswriter写入前数据清理处理模式:
1)append:写入前不做任何处理,DataX
hdfsWroter直接使用 Filename写入,
并保证文件名不冲突
2)nonConfict:如果目录下有fileName前缀
的文件,直接报错
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
"job": {
"content": [
{
"reader": {
"name": "mysqlreader", reader名字
"parameter": { 需要同步的列名集合,使用json数组描述自带
信息,*代表所有列
"column": [], 具体的列和value
"connection": [ 连接信息
{
"jdbcUrl": [],对数据库的JDBC连接信息,使用JSON数组描
述,支持多个连接地址
"table": [] 需要同步的表,支持多个
【"querySql":[]】 自定义SQL,配置它后,mysqlreader直接
忽略table、column、where
}
],
"password": "", 数据库用户名对应的密码
"username": "", 数据库用户名
"where": "", 筛选条件
【"splitPK":"" 】 数据分片字段,一般是主键,仅支持整型
}
},
"writer": {
"name": "hdfswriter", writer名
"parameter": {
"column": [], 写入数据的字段,其中name指定字段名,type
指定类型
"compress": "", hdfs文件压缩类型,默认不填写意味着没有压
缩
"defaultFS": "", hdfs文件系统的namenode节点地址,格式:
hdfs://ip:端口
"fieldDelimiter": "", 字段分隔符
"fileName": "", 写入的文件名
"fileType": "", 文件的类型,目前只支持用户配置位“text”或
者“orc”
"path": "", 存储到Hadoop hdfs文件系统的路劲信息
"writeMode": "" hdfswriter写入前数据清理处理模式:
1)append:写入前不做任何处理,DataX
hdfsWroter直接使用 Filename写入,
并保证文件名不冲突
2)nonConfict:如果目录下有fileName前缀
的文件,直接报错
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
ETL开发模块
需求分析
清洗过滤
1.去除json数据体中的废弃字段(前端开发人员在埋点设计方案变更后遗留的无用字段。)
2.过滤掉json格式不正确的数据(脏数据)
3.过滤掉日志中缺少的关键字段(deviceid/properties/eventid/sessionid缺任何一个都不行)的记录
4.过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
5.对于web端日志,过滤掉爬虫请求数据(通过useragent标识来分析)
2.过滤掉json格式不正确的数据(脏数据)
3.过滤掉日志中缺少的关键字段(deviceid/properties/eventid/sessionid缺任何一个都不行)的记录
4.过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
5.对于web端日志,过滤掉爬虫请求数据(通过useragent标识来分析)
代码示例
def dataClear(spark:SparkSession,start_date:String,end_date:String):DataFrame = {
val curLog = spark.read.table("ods.ods_app_event_log")
.where(s" dt='$start_date' ")
import org.apache.spark.sql.functions._
val isNotBlank = udf((s: String) => {
StringUtils.isNotBlank(s)
})
//过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录!
import spark.implicits._
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val startTime = format.parse(s"$start_date 00:00:00").getTime
val endTime = format.parse(s"$end_date 00:00:00").getTime
curLog.where(isNotBlank($"deviceid") and 'properties.isNotNull and isNotBlank(col("eventid")) and isNotBlank(curLog("sessionid")))
//过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
.where(s" timestamp >= $startTime and timestamp < $endTime")
}
val curLog = spark.read.table("ods.ods_app_event_log")
.where(s" dt='$start_date' ")
import org.apache.spark.sql.functions._
val isNotBlank = udf((s: String) => {
StringUtils.isNotBlank(s)
})
//过滤掉日志中缺少关键字段(deviceid/properties/eventid/sessionid 缺任何一个都不行)的记录!
import spark.implicits._
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val startTime = format.parse(s"$start_date 00:00:00").getTime
val endTime = format.parse(s"$end_date 00:00:00").getTime
curLog.where(isNotBlank($"deviceid") and 'properties.isNotNull and isNotBlank(col("eventid")) and isNotBlank(curLog("sessionid")))
//过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
.where(s" timestamp >= $startTime and timestamp < $endTime")
}
数据解析
将json打平,解析成扁平格式。properties字段不用扁平化,转成Map类型存储即可
代码示例
session分割
1.对于web端的日志,按天session分割,不需要处理
2.对于app日志,由于使用了会话保持策略,导致app进入后台很长时间后,再恢复前台,依然是同一个session,
不符合session分割的定义,需要按事件时间切割(业内通用:30min)
3.对于wx小程序日志,与app类似,session有效期很长,需要按事件时间切割(业内通用:30min)
2.对于app日志,由于使用了会话保持策略,导致app进入后台很长时间后,再恢复前台,依然是同一个session,
不符合session分割的定义,需要按事件时间切割(业内通用:30min)
3.对于wx小程序日志,与app类似,session有效期很长,需要按事件时间切割(业内通用:30min)
代码示例
import spark.implicits._
logBean.rdd.groupBy(bean => bean.sessionId)
.flatMap(tp => {
val iter = tp._2
val list = iter.toList.sortBy(bean => bean.timeStamp)
var tmp = UUID.randomUUID().toString
for (i <- 0 until list.size) {
list(i).splitedSessionId = tmp
if (i < list.size - 1 && (list(i + 1).timeStamp - list(i).timeStamp) > 30 * 60 * 1000)
tmp = UUID.randomUUID().toString
}
list
}).toDS()
logBean.rdd.groupBy(bean => bean.sessionId)
.flatMap(tp => {
val iter = tp._2
val list = iter.toList.sortBy(bean => bean.timeStamp)
var tmp = UUID.randomUUID().toString
for (i <- 0 until list.size) {
list(i).splitedSessionId = tmp
if (i < list.size - 1 && (list(i + 1).timeStamp - list(i).timeStamp) > 30 * 60 * 1000)
tmp = UUID.randomUUID().toString
}
list
}).toDS()
数据规范化处理
数据口径统一,例如:
字符串类型字段,在数据中有空串,有null值,统一为null值。还有日期格式统一等
字符串类型字段,在数据中有空串,有null值,统一为null值。还有日期格式统一等
代码示例
|select
| if(account='',null,account) as account ,
| appId ,
| appVersion ,
| carrier ,
| deviceId ,
| eventId ,
| ip ,
| latitude ,
| longitude ,
| netType ,
| osName ,
| osVersion ,
| properties ,
| resolution ,
| sessionId ,
| timeStamp ,
| null as SplitedsessionId,
| null as filledAccount,
| null as province,
| null as city,
| null as region,
| -1 as guid,
| 0 as isNew
|
|from
| data
|""".stripMargin)
standard.as[LogBean]
| if(account='',null,account) as account ,
| appId ,
| appVersion ,
| carrier ,
| deviceId ,
| eventId ,
| ip ,
| latitude ,
| longitude ,
| netType ,
| osName ,
| osVersion ,
| properties ,
| resolution ,
| sessionId ,
| timeStamp ,
| null as SplitedsessionId,
| null as filledAccount,
| null as province,
| null as city,
| null as region,
| -1 as guid,
| 0 as isNew
|
|from
| data
|""".stripMargin)
standard.as[LogBean]
数据集成
1.将日志中的GPS经纬度坐标解析成省、市、县信息。(为了方便后续的地域维度分析)
2.将日志中的IP地址解析成省、市、县信息。(为了方便后续的地域维度分析)
注:app日志和wxapp日志,有采集到的用户事件行为时的所在地gps坐标信息。
web日志则无法收集到用户的gps坐标,但可以收集到IP地址
gps坐标可以表达精确的地理位置,而IP地址只能表达准确度较低而且精度较低的地理位置
2.将日志中的IP地址解析成省、市、县信息。(为了方便后续的地域维度分析)
注:app日志和wxapp日志,有采集到的用户事件行为时的所在地gps坐标信息。
web日志则无法收集到用户的gps坐标,但可以收集到IP地址
gps坐标可以表达精确的地理位置,而IP地址只能表达准确度较低而且精度较低的地理位置
代码示例
ID-Mapping
为每一个用户每一条访问记录,标识一个全局唯一id
只使用deviceid来做用户标识的方案:
一部设备上可能出现A账号和B账号,就会被认为时一个人;同一个账号,在不同的设备上登录使用,这些行为数据会被认为是多个人。
只用account来做用户标识的方案:
一部设备上可能出现A账号和B账号,会被正确识别两个人;同一个账号,在不同设备上使用,这些行为数据会被正确识别为1个人。
只要识别出来一个用户,则为这个用户专门生成一个整数类型的自增的全局唯一id
只使用deviceid来做用户标识的方案:
一部设备上可能出现A账号和B账号,就会被认为时一个人;同一个账号,在不同的设备上登录使用,这些行为数据会被认为是多个人。
只用account来做用户标识的方案:
一部设备上可能出现A账号和B账号,会被正确识别两个人;同一个账号,在不同设备上使用,这些行为数据会被正确识别为1个人。
只要识别出来一个用户,则为这个用户专门生成一个整数类型的自增的全局唯一id
代码示例
spark.sql(
s"""
|insert overwrite table dwd.dwd_app_event_detail partition(dt='$start_date')
|select
| account ,
| appId ,
| appVersion ,
| carrier ,
| deviceId ,
| eventId ,
| ip ,
| latitude ,
| longitude ,
| netType ,
| osName ,
| osVersion ,
| properties ,
| resolution ,
| sessionId ,
| timeStamp ,
| splitedSessionId,
| filledAccount,
| province,
| city,
| region,
| guid,
| isnew
|
|from
| res
|""".stripMargin)
s"""
|insert overwrite table dwd.dwd_app_event_detail partition(dt='$start_date')
|select
| account ,
| appId ,
| appVersion ,
| carrier ,
| deviceId ,
| eventId ,
| ip ,
| latitude ,
| longitude ,
| netType ,
| osName ,
| osVersion ,
| properties ,
| resolution ,
| sessionId ,
| timeStamp ,
| splitedSessionId,
| filledAccount,
| province,
| city,
| region,
| guid,
| isnew
|
|from
| res
|""".stripMargin)
新老访客标记
新访客,标记为1
老访客,标记为0
老访客,标记为0
代码示例
import spark.implicits._
val anonymousFilled = spark.sql(
s"""
|
|select
| areaed.account ,
| areaed.appId ,
| areaed.appVersion ,
| areaed.carrier ,
| areaed.deviceId ,
| areaed.eventId ,
| areaed.ip ,
| areaed.latitude ,
| areaed.longitude ,
| areaed.netType ,
| areaed.osName ,
| areaed.osVersion ,
| areaed.properties ,
| areaed.resolution ,
| areaed.sessionId ,
| areaed.timeStamp ,
| areaed.SplitedsessionId,
| nvl(areaed.account,o2.account)filledAccount,
| areaed.province,
| areaed.city,
| areaed.region,
| areaed.guid,
| areaed.isNew
|from
| areaed
|left join
| (select
| deviceid,
| account
| from
| (select
| deviceid,
| account,
| row_number() over(partition by deviceid order by score desc,last_login desc) as r
| from
| dws.dws_device_account_bind_score
| where
| dt='$start_date' and account is not null
| ) o1
| where r=1
| )o2
| on areaed.deviceid = o2.deviceid
|
|""".stripMargin)
val anonymousFilled = spark.sql(
s"""
|
|select
| areaed.account ,
| areaed.appId ,
| areaed.appVersion ,
| areaed.carrier ,
| areaed.deviceId ,
| areaed.eventId ,
| areaed.ip ,
| areaed.latitude ,
| areaed.longitude ,
| areaed.netType ,
| areaed.osName ,
| areaed.osVersion ,
| areaed.properties ,
| areaed.resolution ,
| areaed.sessionId ,
| areaed.timeStamp ,
| areaed.SplitedsessionId,
| nvl(areaed.account,o2.account)filledAccount,
| areaed.province,
| areaed.city,
| areaed.region,
| areaed.guid,
| areaed.isNew
|from
| areaed
|left join
| (select
| deviceid,
| account
| from
| (select
| deviceid,
| account,
| row_number() over(partition by deviceid order by score desc,last_login desc) as r
| from
| dws.dws_device_account_bind_score
| where
| dt='$start_date' and account is not null
| ) o1
| where r=1
| )o2
| on areaed.deviceid = o2.deviceid
|
|""".stripMargin)
地理位置信息集成
GEOHASH编码:在地球经纬度范围内,不断通过二分划分矩形范围,通过观察gps坐标点所落的范围,
来反复生成0/1二进制码
来反复生成0/1二进制码
代码示例
val areaDict = spark.read.table("dim.dim_area_dict")
.where("geohash is not null and geohash != '' ")
val gpsDictMap = areaDict.rdd.map({ case Row(geohash: String, province: String, city: String, region: String)
=> (geohash, (province, city, region))
}).collectAsMap()
//广播数据
val bc = spark.sparkContext.broadcast(gpsDictMap)
.where("geohash is not null and geohash != '' ")
val gpsDictMap = areaDict.rdd.map({ case Row(geohash: String, province: String, city: String, region: String)
=> (geohash, (province, city, region))
}).collectAsMap()
//广播数据
val bc = spark.sparkContext.broadcast(gpsDictMap)
数据质量监控
需求分析
从ods层经过etl后存入到dwd层的数据,我们需要了解经过处理数据的数据质量如何,
例如地区的填充率,如果有大量的地区字段为空,说明数据处理的质量较低,需要优化和完善。
例如地区的填充率,如果有大量的地区字段为空,说明数据处理的质量较低,需要优化和完善。
代码示例
#!/bin/bash
cur_day=$(date -d'-1 day' +'%Y-%m-%d')
if [ $# -eq 1 ]
then
cur_day=$1
fi
sql="
insert into table dq.dq_dwd_app_event_log_area
select
'${cur_day}' as dt,
count(if(province is null,1,null)) as province_null,
count(province) as province_notnull,
count(if(city is null,1,null)) as city_null,
count(city) as city_notnull,
count(if(region is null,1,null)) as region_null,
count(region) as region_notnull,
count(1) as table_record_cnt
from
dwd.dwd_app_event_detail
where
dt='${cur_day}'
group by dt ;
hive -e "${sql}"
if [ $? -eq 0 ]
then
echo "日志数据日期:${cur_day}; 目标表:dq.dq_dwd_app_event_log_area"+"状态:成
功"
exit 0
else
echo "日志数据日期:${cur_day}; 目标表:dq.dq_dwd_app_event_log_area"+"状态:失
败"
exit 1
fi
cur_day=$(date -d'-1 day' +'%Y-%m-%d')
if [ $# -eq 1 ]
then
cur_day=$1
fi
sql="
insert into table dq.dq_dwd_app_event_log_area
select
'${cur_day}' as dt,
count(if(province is null,1,null)) as province_null,
count(province) as province_notnull,
count(if(city is null,1,null)) as city_null,
count(city) as city_notnull,
count(if(region is null,1,null)) as region_null,
count(region) as region_notnull,
count(1) as table_record_cnt
from
dwd.dwd_app_event_detail
where
dt='${cur_day}'
group by dt ;
hive -e "${sql}"
if [ $? -eq 0 ]
then
echo "日志数据日期:${cur_day}; 目标表:dq.dq_dwd_app_event_log_area"+"状态:成
功"
exit 0
else
echo "日志数据日期:${cur_day}; 目标表:dq.dq_dwd_app_event_log_area"+"状态:失
败"
exit 1
fi
实际需求中的数据质量监控
1.记录数检查法:
通过比较记录条数,对数据情况进行概括性验证。主要是检查数据表的记录数是否为确定的数值或在确定的范围内。
适用范围:
对于数据表中按日期进行增量加载的数据,每个加载周期递增的记录数为常数值或可以确定的范围时,必须进行记录条数检验。
2.关键指标总量验证法:
对于关键指标,对比数据总量是否一致。主要是指具有相同业务含义,从不同维度统计的汇总逻辑的检查。
适用范围:
同表内对同个字段从不同的维度进行统计,存在汇总关系时必须进行总量检验。本表的字段与其他表中的字段具有相同的业务含义,
从不同的维度统计,存在汇总关系,且两张表的数据不是经同一数据源加工得到,满足此条件时必须进行质量检验。
例如:企业的总收入、总利润、总费用、总投资等指标。
3.历史数据对比法(重点):
通过历史数据观察数据变化规律,从而验证数据质量。通常以同比发展速度进行评判。评估时应根据各种指标发展特点,
重点对同比发展速度增幅(或降幅)较大的数据进行审核。历史数据对比法包括同比和环比两种方式。
适用范围:
不能进行记录数检查法、关键指标总量验证法,且事实表的记录数小于1000万条时必须进行历史数据对比法。
4.值域判断法(重点):
确定一定时期内指标数据合理的变动区间,对区间外的数据进行重点审核。其中数据的合理变动区间范围是直接根据业务经验来确定的。
适用范围:
事实表中的字段可以确定取值范围,同时可以判断不在此范围内的数据必定是错误的,满足此条件必须精心值域判断法。
例如:基于年龄维度统计在职员工的数量,低于18岁,高于65岁的数据属于异常数据,应重点审核。
5.经验审核法:
针对报表中指标间逻辑关系仅靠计算机程序审核无法确认、量化,或有些审核虽设定数量界限,但界限较宽不好判定的情况,需要增加人工经验审核。
适用范围:
无法量化或量化界限无法评定的情况,使用人工经验审核法。例如:某数据安全事故对企业声誉的影响程度。
6.匹配判断法:
与相关部门提供或发布的有关数据进行对比验证。
适用范围:
与相关部门提供或发布的有关数据口径一直的,可以使用匹配判断法。例如:上市公司的净资产收益率,总资产负债率。
通过比较记录条数,对数据情况进行概括性验证。主要是检查数据表的记录数是否为确定的数值或在确定的范围内。
适用范围:
对于数据表中按日期进行增量加载的数据,每个加载周期递增的记录数为常数值或可以确定的范围时,必须进行记录条数检验。
2.关键指标总量验证法:
对于关键指标,对比数据总量是否一致。主要是指具有相同业务含义,从不同维度统计的汇总逻辑的检查。
适用范围:
同表内对同个字段从不同的维度进行统计,存在汇总关系时必须进行总量检验。本表的字段与其他表中的字段具有相同的业务含义,
从不同的维度统计,存在汇总关系,且两张表的数据不是经同一数据源加工得到,满足此条件时必须进行质量检验。
例如:企业的总收入、总利润、总费用、总投资等指标。
3.历史数据对比法(重点):
通过历史数据观察数据变化规律,从而验证数据质量。通常以同比发展速度进行评判。评估时应根据各种指标发展特点,
重点对同比发展速度增幅(或降幅)较大的数据进行审核。历史数据对比法包括同比和环比两种方式。
适用范围:
不能进行记录数检查法、关键指标总量验证法,且事实表的记录数小于1000万条时必须进行历史数据对比法。
4.值域判断法(重点):
确定一定时期内指标数据合理的变动区间,对区间外的数据进行重点审核。其中数据的合理变动区间范围是直接根据业务经验来确定的。
适用范围:
事实表中的字段可以确定取值范围,同时可以判断不在此范围内的数据必定是错误的,满足此条件必须精心值域判断法。
例如:基于年龄维度统计在职员工的数量,低于18岁,高于65岁的数据属于异常数据,应重点审核。
5.经验审核法:
针对报表中指标间逻辑关系仅靠计算机程序审核无法确认、量化,或有些审核虽设定数量界限,但界限较宽不好判定的情况,需要增加人工经验审核。
适用范围:
无法量化或量化界限无法评定的情况,使用人工经验审核法。例如:某数据安全事故对企业声誉的影响程度。
6.匹配判断法:
与相关部门提供或发布的有关数据进行对比验证。
适用范围:
与相关部门提供或发布的有关数据口径一直的,可以使用匹配判断法。例如:上市公司的净资产收益率,总资产负债率。
业务域ods开发
DWD层设计开发
本层主要表类型:存储各业务表的全量快照
存储各业务表的拉链表
快照表和拉链表,都是分区全量表。只不过,快照表需要保存每一天的分区,才能查询到每一天的该表的数据状态。
而拉链表,则只需要保留最后一天的分区即可。
存储各业务表的拉链表
快照表和拉链表,都是分区全量表。只不过,快照表需要保存每一天的分区,才能查询到每一天的该表的数据状态。
而拉链表,则只需要保留最后一天的分区即可。
拉链表概念及实现逻辑
什么是拉链表
以订单表为例,表中90%的数据基本不会随着时间而变化,只有最近一段时间内的数据会有变化
对于这种类型的表,我们往往需要保存好每一条数据的每一天的状态。
对于这种类型的表,我们往往需要保存好每一条数据的每一天的状态。
方案一
优点:可以每天保存一份全量表,并长期存储,这样可以实现每天状态的保存,也方便查询任何一天中数据的状态
弊端:由于表中90%的数据都不会变化,因此,各天的全量表,其实大量数据都是相同的,存储冗余度太高。
弊端:由于表中90%的数据都不会变化,因此,各天的全量表,其实大量数据都是相同的,存储冗余度太高。
方案二
使用拉链表模型,来实现每条数据每天的状态变化情况
优点:既能保留每天状态,又比较节省存储空间
弊端:使用、查询的时候,略增加了一点复杂性。
优点:既能保留每天状态,又比较节省存储空间
弊端:使用、查询的时候,略增加了一点复杂性。
拉链表计算sql
with a as (
select *
from test.lalian where dt='2021-01-22'
)
,b as (
select * from test.add where dt='2021-01-23'
)
insert into table test.lalian partition(dt='2021-01-23')
select
a.oid,
a.amount,
a.status,
a.start_dt,
if(a.end_dt='9999-12-31' and b.oid is not null,a.dt,a.end_dt) as end_dt
from a left join b on a.oid = b.oid
union all
select
oid,
amount,
status,
dt as start_dt,
'9999-12-31' as end_dt
from b
select *
from test.lalian where dt='2021-01-22'
)
,b as (
select * from test.add where dt='2021-01-23'
)
insert into table test.lalian partition(dt='2021-01-23')
select
a.oid,
a.amount,
a.status,
a.start_dt,
if(a.end_dt='9999-12-31' and b.oid is not null,a.dt,a.end_dt) as end_dt
from a left join b on a.oid = b.oid
union all
select
oid,
amount,
status,
dt as start_dt,
'9999-12-31' as end_dt
from b
流量主题
需求说明
在业务表中对数据的各个维度来进行统计
利用高阶聚合函数,一次性计算多维分析报表
利用高阶聚合函数,一次性计算多维分析报表
流量分析-多维cube表
多维表作用:
与需求人员进行沟通,得到终端需求所关心的所有维度组合;
然后可以把这些维度组合的报表,一次性写好,放入定时调度系统,每天进行例行计算
然后可以把这些维度组合的报表,一次性写好,放入定时调度系统,每天进行例行计算
大致层级
一个维度所有可能的取值的个数,叫做这个维度基数。
像省市区,几百个信息,低维度
像用户id,手机号等等,千万甚至上亿,高维度
再高,超高维度,会造成cube膨胀,再进行维度组和产生数据量比原始数据量大出数倍。
像省市区,几百个信息,低维度
像用户id,手机号等等,千万甚至上亿,高维度
再高,超高维度,会造成cube膨胀,再进行维度组和产生数据量比原始数据量大出数倍。
代码示例
--利用高阶聚合函数,一次性计算多维分析报表(cube表)
with tmp as(
select
nvl(account ,'UNKOWN')as account
,nvl(appid ,'UNKOWN')as appid
,nvl(appversion ,'UNKOWN')as appversion
,nvl(carrier ,'UNKOWN')as carrier
,nvl(deviceid ,'UNKOWN')as deviceid
,nvl(eventid ,'UNKOWN')as eventid
,nvl(ip ,'UNKOWN')as ip
,nvl(latitude ,'UNKOWN')as latitude
,nvl(longitude ,'UNKOWN')as longitude
,nvl(nettype ,'UNKOWN')as nettype
,nvl(osname ,'UNKOWN')as osname
,nvl(osversion ,'UNKOWN')as osversion
,nvl(resolution ,'UNKOWN')as resolution
,nvl(sessionid ,'UNKOWN')as sessionid
,nvl(`timestamp` ,'UNKOWN')as `timestamp`
,nvl(filledaccount ,'UNKOWN')as filledaccount
,nvl(province ,'UNKOWN')as province
,nvl(city ,'UNKOWN')as city
,nvl(region ,'UNKOWN')as region
,nvl(guid ,'UNKOWN')as guid
,nvl(isnew ,'UNKOWN')as isnew
,nvl(page_acc_tml ,'UNKOWN')as page_acc_tml
,nvl(splitedsessionid,'UNKOWN')as splitedsessionid
,nvl(enter_page_id ,'UNKOWN')as enter_page_id
,nvl(exit_page_id ,'UNKOWN')as exit_page_id
,nvl(start_time ,'UNKOWN')as start_time
,nvl(end_time ,'UNKOWN')as end_time
,nvl(is_jumpout ,'UNKOWN')as is_jumpout
,nvl(pv_cnt ,'UNKOWN')as pv_cnt
from
dws.dws_app_tfc_topic
where
dt='2021-01-01'
)
insert into table ads.ads_app_tfc_cube partition(dt='2021-01-01')
select
appid
,appversion
,carrier
,nettype
,osname
,province
,city
,region
,isnew
,enter_page_id
,exit_page_id
,is_jumpout
,count(1) pv_cnt
,count(distinct guid) as uv_cnt
,count(distinct splitedsessionid) as ses_cnt
,sum(page_acc_tml) as acc_tml
,sum(page_acc_tml)/count(distinct splitedsessionid) as avg_ses_tml
,count(distinct ip) as ip_cnt
,count(distinct if(is_jumpout='Y',sessionid,null)) as jpt_ses_cnt
from
tmp
group by
appid
,appversion
,carrier
,nettype
,osname
,province
,city
,region
,isnew
,enter_page_id
,exit_page_id
,is_jumpout
grouping sets(
(appid)
,(appid,appversion)
,(osname)
,(carrier,nettype)
,(province)
,(province,city)
,(province,city,region)
,(isnew)
,(is_jumpout)
,(province,isnew)
,(enter_page_id)
,(exit_page_id)
,()
);
with tmp as(
select
nvl(account ,'UNKOWN')as account
,nvl(appid ,'UNKOWN')as appid
,nvl(appversion ,'UNKOWN')as appversion
,nvl(carrier ,'UNKOWN')as carrier
,nvl(deviceid ,'UNKOWN')as deviceid
,nvl(eventid ,'UNKOWN')as eventid
,nvl(ip ,'UNKOWN')as ip
,nvl(latitude ,'UNKOWN')as latitude
,nvl(longitude ,'UNKOWN')as longitude
,nvl(nettype ,'UNKOWN')as nettype
,nvl(osname ,'UNKOWN')as osname
,nvl(osversion ,'UNKOWN')as osversion
,nvl(resolution ,'UNKOWN')as resolution
,nvl(sessionid ,'UNKOWN')as sessionid
,nvl(`timestamp` ,'UNKOWN')as `timestamp`
,nvl(filledaccount ,'UNKOWN')as filledaccount
,nvl(province ,'UNKOWN')as province
,nvl(city ,'UNKOWN')as city
,nvl(region ,'UNKOWN')as region
,nvl(guid ,'UNKOWN')as guid
,nvl(isnew ,'UNKOWN')as isnew
,nvl(page_acc_tml ,'UNKOWN')as page_acc_tml
,nvl(splitedsessionid,'UNKOWN')as splitedsessionid
,nvl(enter_page_id ,'UNKOWN')as enter_page_id
,nvl(exit_page_id ,'UNKOWN')as exit_page_id
,nvl(start_time ,'UNKOWN')as start_time
,nvl(end_time ,'UNKOWN')as end_time
,nvl(is_jumpout ,'UNKOWN')as is_jumpout
,nvl(pv_cnt ,'UNKOWN')as pv_cnt
from
dws.dws_app_tfc_topic
where
dt='2021-01-01'
)
insert into table ads.ads_app_tfc_cube partition(dt='2021-01-01')
select
appid
,appversion
,carrier
,nettype
,osname
,province
,city
,region
,isnew
,enter_page_id
,exit_page_id
,is_jumpout
,count(1) pv_cnt
,count(distinct guid) as uv_cnt
,count(distinct splitedsessionid) as ses_cnt
,sum(page_acc_tml) as acc_tml
,sum(page_acc_tml)/count(distinct splitedsessionid) as avg_ses_tml
,count(distinct ip) as ip_cnt
,count(distinct if(is_jumpout='Y',sessionid,null)) as jpt_ses_cnt
from
tmp
group by
appid
,appversion
,carrier
,nettype
,osname
,province
,city
,region
,isnew
,enter_page_id
,exit_page_id
,is_jumpout
grouping sets(
(appid)
,(appid,appversion)
,(osname)
,(carrier,nettype)
,(province)
,(province,city)
,(province,city,region)
,(isnew)
,(is_jumpout)
,(province,isnew)
,(enter_page_id)
,(exit_page_id)
,()
);
子主题
BitMap工具
什么是bitmap
bitmap,即位图,使用每个位表示某种状态,适合处理整型的海量数据。本质上是哈希表的一种应用实现,
原理也很简单,给定一个int整型数据,将该int整数映射到对应的位上,并将该位由0改为1.
原理也很简单,给定一个int整型数据,将该int整数映射到对应的位上,并将该位由0改为1.
bitmap的作用
基础聚合表,并不是最终终端用户所需要的报表,是一个相对较细粒度的聚合表累加的指标,可以直接层级累加类的指标,
不能直接层级累加;我们采用bitmap数据结构来实现逐层聚合。
不能直接层级累加;我们采用bitmap数据结构来实现逐层聚合。
收藏
收藏
0 条评论
下一页