电商数据仓库
2024-03-05 20:19:28 8 举报
AI智能生成
数据仓库是一种用于存储、处理和分析大量数据的技术,它提供了一个集中的存储和管理数据环境。数据仓库可以用于提取、转换、加载(ETL)流程,以集成来自不同数据源的数据,如关系数据库、数据湖和云存储。通过数据仓库,可以执行数据分析和数据挖掘,以获取业务洞察和决策支持。常见的数据仓库工具包括Hadoop、Teradata、Oracle和Microsoft SQL Server。
作者其他创作
大纲/内容
其他
Shell 中单引号和双引号区别
- 单引号不取变量值
- 双引号取变量值
- 反引号`,执行引号中命令
- 双引号内部嵌套单引号,取出变量值
- 单引号内部嵌套双引号,不取出变量值
用户行为采集平台
数据仓库概念
数据仓库,是为企业所有决策指定过程,提供所有系统数据支持的战略集合。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些包括数据的:清洗、转义、分类、重组、合并、拆分、统计等。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些包括数据的:清洗、转义、分类、重组、合并、拆分、统计等。
数据仓库的输入数据源和输出系统分别是什么?
输入系统:埋点产生的用户行为数据、JavaEE 后台产生的业务数据。
输出系统:报表系统、用户画像系统、推荐系统
集群规模计算
日活量为100万,没人一天平均100条日志记录:100万 * 100 条 = 1亿条
每条日志1k : 100000000 / 1024 /1024 = 100G
半年不扩容:100G * 180天 = 18T
保存3个副本:18T * 3 = 54 T
预留20%-30%磁盘容量=54T/0.7 = 77T
约 8T * 10台服务器
考虑数仓,压缩,需要重新计算
系统流程设计
版本选择
apache
运维麻烦,组件兼容性需要自己调研
CDH
不开源,需要收费,按节点收费
HDP
开源,没有CDH稳定,国内使用较少,可以二次开发
数据采集模块
环境准备
无密钥登陆
ssh-keygen -t rsa
ssh-copy-id
ssh-copy-id
Java环境
Linux 环境变量
1)修改/etc/profile 文件:用来设置系统环境参数,比如$PATH. 这里面的环境变量是对 系统内所有用户生效。使用 bash 命令,需要 source /etc/profile 一下。
2)修改~/.bashrc 文件:针对某一个特定的用户,环境变量的设置只对该用户自己有效。 使用 bash 命令,只要以该用户身份运行命令行就会读取该文件。
3)把/etc/profile 里面的环境变量追加到~/.bashrc 目录
4)说明
登录式 Shell,采用用户名比如 atguigu 登录,会自动加载/etc/profile
非登录式 Shell,采用 ssh 比如 ssh hadoop103 登录,不会自动加载/etc/profile,会自动 加载~/.bashrc
登录式 Shell,采用用户名比如 atguigu 登录,会自动加载/etc/profile
非登录式 Shell,采用 ssh 比如 ssh hadoop103 登录,不会自动加载/etc/profile,会自动 加载~/.bashrc
Hadoop安装
集群规划
NameNode、Resourcemanager、SecondaryNameNode
三个节点尽量不要在同一个机器上
三个节点尽量不要在同一个机器上
配置文件8个
core-site.xml
hadoop-env.sh
yarn-env.sh
yarn-site.xml
mapred-env.sh
mapred-site.xml
slaves
hdfs-site.xml
格式化集群
hadoop namenode -format
启动集群
sbin/start-dfs.sh
sbin/start-yarn.sh
注意:NameNode 和 ResourceManger 如果不是同一台机器,不能在 NameNode 上启 动 YARN,应该在 ResouceManager 所在的机器上启动 YARN
sbin/start-yarn.sh
注意:NameNode 和 ResourceManger 如果不是同一台机器,不能在 NameNode 上启 动 YARN,应该在 ResouceManager 所在的机器上启动 YARN
项目经验
存储多目录
1)在 DataNode 节点增加磁盘并进行挂载。
2)在 hdfs-site.xml 文件中配置多目录,注意新挂载磁盘的访问权限问题。
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/ data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/ data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
</property>
3)增加磁盘后,保证每个目录数据均衡
bin/start-balancer.sh –threshold 10
对于参数 10,代表的是集群中各个节点的磁盘空间利用率相差不超过 10%,可根据实际情况进行调整
对于参数 10,代表的是集群中各个节点的磁盘空间利用率相差不超过 10%,可根据实际情况进行调整
bin/stop-balancer.sh
注意:于HDFS 需要启动单独的Rebalance Server 来执行Rebalance 操作,所以尽量不
要在NameNode 上执行start-balancer.sh,而是找一台比较空闲的机器。
注意:于HDFS 需要启动单独的Rebalance Server 来执行Rebalance 操作,所以尽量不
要在NameNode 上执行start-balancer.sh,而是找一台比较空闲的机器。
LZO 压缩配置
1) hadoop 本身并不支持 lzo 压缩,故需要使用 twitter 提供的 hadoop-lzo 开源组件。 hadoop-lzo 需依赖 hadoop 和 lzo 进行编译
环境准备
maven
yum install -y gcc-c++ zlib-devel autoconf automake
下载、编译安装lzo
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
解压
./configure -prefix=/usr/local/hadoop/lzo/
make
make install
make
make install
编译hadoop-1 zo源码
下载hadoop-lzo的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/niaster.zip
修改pom. xml
<hadoop.current.version>
2. 7. 2
</hadoop.current.version>
<hadoop.current.version>
2. 7. 2
</hadoop.current.version>
声明两个临时环境变量
export C_INCLUDE_PATH=/usr/Iocal/hadoop/Izo/inc1ude
export LIBRARY_PATH=/usr/local/hadoop/1zo/1ib
export C_INCLUDE_PATH=/usr/Iocal/hadoop/Izo/inc1ude
export LIBRARY_PATH=/usr/local/hadoop/1zo/1ib
进入hadoop-lzo-master
mvn package -Dmaven.test.skip=true
mvn package -Dmaven.test.skip=true
2)将编译好后的 hadoop-lzo-0.4.20.jar 放入 hadoop-2.7.2/share/hadoop/common/
3)同步 hadoop-lzo-0.4.20.jar 到集群
4)core-site.xml 增加配置支持 LZO 压缩
<property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property>
5)同步 core-site.xml 到 集群
6)重启集群
LZO 创建索引
创建 LZO 文件的索引,LZO 压缩文件的可切片特性依赖于其索引,故我们需要手动为 LZO 压缩文件创建索引。若无索引,则 LZO 文件的切片只有一个。
hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo
基准测试
向 HDFS 集群写 10 个 128M 的文件
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-j obclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-j obclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
读取 HDFS 集群 10 个 128M 的文件
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-j obclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-j obclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
删除测试生成数据
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapredu ce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapredu ce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean
使用 Sort 程序评测 MapReduce
(1)使用 RandomWriter 来产生随机数,每个节点运行 10 个 Map 任务,每个 Map 产 生大约 1G 大小的二进制随机数
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapredu ce-examples-2.7.2.jar randomwriter random-data
(2)执行 Sort 程序
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapredu ce-examples-2.7.2.jar sort random-data sorted-data
(3)验证数据是否真正排好序了
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapredu ce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
Hadoop 参数调优
1)HDFS 参数调优 hdfs-site.xml dfs.namenode.handler.count=20 * log2
比如集群规模为 8 台时,此参数设 置为 60
比如集群规模为 8 台时,此参数设 置为 60
2)YARN 参数调优 yarn-site.xml
(1)情景描述:总共 7 台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive 面临问题:数据统计主要用 HiveSQL,没有数据倾斜,小文件已经做了合并处理,开 启的 JVM 重用,而且 IO 没有阻塞,内存用了不到 50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
(a)yarn.nodemanager.resource.memory-mb
表示该节点上 YARN 可使用的物理内存总量,默认是 8192(MB),注意,如果你的节点
内存资源不够 8GB,则需要调减小这个值,而 YARN 不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是 8192(MB)。
(a)yarn.nodemanager.resource.memory-mb
表示该节点上 YARN 可使用的物理内存总量,默认是 8192(MB),注意,如果你的节点
内存资源不够 8GB,则需要调减小这个值,而 YARN 不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是 8192(MB)。
3)Hadoop 宕机
(1)如果 MR 造成系统宕机。此时要控制 Yarn 同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是 8192MB)
(2)如果写入文件过量造成 NameNode 宕机。那么调高 Kafka 的存储大小,控制从 Kafka到 HDFS 的写入速度。高峰期的时候用 Kafka 进行缓存,高峰期过去数据同步会自动跟上。
Zookeeper安装
配置文件
zoo.cfg
mkdir zkData
myid 不同
不是越多越好,也不是越少越好。 如果多,通信时间长,效率低;如果太少,可靠性差。
半数机制,安装奇数台
10 台服务器几台:3 台
20 台服务器几台:5 台
100 台服务器几台:11 台
10 台服务器几台:3 台
20 台服务器几台:5 台
100 台服务器几台:11 台
采集日志Flume安装
Flume 组件
1)Source
(1)TaildirSource 相比 ExecSource、SpoolingDirectorySource 的优势
TailDirSource:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。
ExecSource 可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失。
SpoolingDirectory Source 监控目录,不支持断点续传。
(2)batchSize 大小如何设置?
Event1K 左右时,500-1000 合适(默认为 100)
2)Channel
FileChannel
FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从失败中恢复数据。
金融类公司、对钱要求非常准确的公司通常会选择 FileChannel
MemoryChannel
MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
传输的是普通日志信息(京东内部一天丢 100 万-200 万条,这是非常正常的),通常 选择 MemoryChannel。
Kafka Channel
采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,
所以数据是存储在磁盘中。
注意在 Flume1.7 以前,Kafka Channel 很少有人使用,因为发现 parseAsFlumeEvent 这个 配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false, 都会转为Flume Event。 这样的话,造成的结果是,会始终都把 Flume 的 headers 中的信息混合着内容一起写入 Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
所以数据是存储在磁盘中。
注意在 Flume1.7 以前,Kafka Channel 很少有人使用,因为发现 parseAsFlumeEvent 这个 配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false, 都会转为Flume Event。 这样的话,造成的结果是,会始终都把 Flume 的 headers 中的信息混合着内容一起写入 Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
Source 到 Channel 是 Put 事务
Channel 到 Sink 是 Take 事务
Flume 配置
配置文件
Flume 拦截器
拦截器注意事项
自定义了:ETL 拦截器和区分类型拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
自定义拦截器步骤
实现 Interceptor
重写四个方法
initialize 初始化
publicEventintercept(Eventevent) 处理单个 Event
public List<Event> intercept(List<Event> events) 处理多个 Event,在这个 方法中调用 Eventintercept(Eventevent)
close 方法
静态内部类,实现 Interceptor.Builder
pom.xml
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies>
Flume 监控器
Ganglia
项目经验
Flume 组件
FileChannel 优化
通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。
checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据
Sink:HDFS Sink
HDFS 存入大量小文件,有什么影响?
计算层面:默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命
HDFS 小文件处理
官方默认的这三个参数配置写入 HDFS 后会产生小文件, hdfs.rollInterval、 hdfs.rollSize、 hdfs.rollCount
基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount=0 几个参数综合作用,效果如下:
(1)文件在达到 128M 时会滚动生成新文件 (2)文件创建超 3600 秒时会滚动生成新文件
基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount=0 几个参数综合作用,效果如下:
(1)文件在达到 128M 时会滚动生成新文件 (2)文件创建超 3600 秒时会滚动生成新文件
Flume 内存优化
问题描述:如果启动消费 Flume 抛出如下异常
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded
在 hadoop102 服务器的/opt/module/flume/conf/flume-env.sh 文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
Flume 内存参数设置及优化
JVMheap 一般设置为 4G 或更高,部署在单独的服务器上(4 核 8 线程 16G 内存)
-Xmx 与-Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导 致频繁 fullgc。
-Xms 表示 JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示 JVM Heap(堆内存)最
大允许的尺寸,按需分配。 如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。
大允许的尺寸,按需分配。 如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。
Flume 采集数据会丢失吗?
不会,Channel 存储可以存储在 File 中,数据传输自身有事务。
kafka安装
配置文件
server.properties
#broker的全局唯一编号,不能重复
启动
bin/kafka-server-start.sh config/server.properties &
常用命令
查看 KafkaTopic 列表
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
创建 KafkaTopic
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
删除 KafkaTopic
bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_start
Kafka 生产消息
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_start
Kafka 消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic topic_start
查看 KafkaTopic 详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic topic_start
消费 Kafka 数据 Flume
Kafka 监控
开源的监控器:KafkaManager、KafkaMonitor
Kafka 的 ISR 副本同步队列
ISR(In-SyncReplicas),副本同步队列。ISR 中包括 Leader 和 Follower。如果 Leader 进程挂掉,会在 ISR 队列中选择一个服务作为新的 Leader。有 replica.lag.max.messages(延 迟条数)和 replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入 ISR 副 本队列,在 0.10 版本移除了 replica.lag.max.messages 参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把 Follower 剔除出 ISR,存入 OSR(Outof-SyncReplicas) 列表,新加入的 Follower 也会先存放在 OSR 中。
任意一个维度超过阈值都会把 Follower 剔除出 ISR,存入 OSR(Outof-SyncReplicas) 列表,新加入的 Follower 也会先存放在 OSR 中。
Kafka 幂等性
Kafka0.11 版本引入了幂等性,幂等性配合 atleastonce 语义可以实现 exactly once 语义。但只能保证单次会话的幂等。
Kafka 事务
Kafka0.11 版本引入 Kafka 的事务机制,其可以保证生产者发往多个分区的一批数据的原子性。
项目经验
Kafka 压力测试
Kafka Producer 压力测试
bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
num-records 是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
num-records 是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
Kafka Consumer 压力测试
bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
Kafka 机器数量计算
Kafka 机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署 Kafka 的数量。 比如我们的峰值生产速度是 50M/s。副本数为 2。 Kafka 机器数量=2*(50*2/100)+1=3 台
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署 Kafka 的数量。 比如我们的峰值生产速度是 50M/s。副本数为 2。 Kafka 机器数量=2*(50*2/100)+1=3 台
Kafka 的日志保存时间
3 天 默认7天
Kafka 的硬盘大小
每天的数据量*3 天
Kakfa 分区数。
创建一个只有 1 个分区的 topic
测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。
然后假设总的目标吞吐量是 Tt,那么分区数=Tt/min(Tp,Tc)
例如:producer 吞吐量=10m/s;consumer 吞吐量=50m/s,期望吞吐量 100m/s
分区数=100/10=10 分区
分区数一般设置为:3-10 个
分区数=100/10=10 分区
分区数一般设置为:3-10 个
Kafka 分区分配
Range 和 RoundRobin
副本数设定
一般我们设置成 2 个或 3 个,很多企业设置为 2 个
多少个 Topic
通常情况:多少个日志类型就多少个 Topic。也有对日志类型进行合并的。
Kafka 丢不丢数据
Ack=0,producer 不等待 kafka broker 的 ack,一直生产数据。
Ack=1,leader 数据落盘就发送 ack,producer 收到 ack 才继续生产数据。
Ack=-1,ISR 中的所有副本数据罗盘才发送 ack,producer 收到 ack 才继续生产数据。
Kafka 消息数据积压,Kafka 消费能力不足怎么处理
如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消 费组的消费者数量,消费者数=分区数。(两者缺一不可)
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉 取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
总结
常用linux/shell命令
top
df-h
iotop
查看磁盘 IO 读写(yum install iotop 安装)
iotop -o
直接查看比较高的磁盘读写程序
netstat -tunlp|grep 端口号
uptime
Hadoop相关总结
Hadoop 默认不支持 LZO 压缩,如果需要支持 LZO 压缩,需要添加 jar 包,并在 hadoop 的 cores-site.xml 文件中添加相关压缩配置。需要掌握让 LZO 文件支持切片。
Hadoop 常用端口号,50070,8088,19888,9000
Hadoop 配置文件以及简单的 Hadoop 集群搭建。8 个配置文件
HDFS 读流程和写流程
MapReduce 的 Shuffle 过程及 Hadoop 优化(包括:压缩、小文件、集群优化)
Yarn 的 Job 提交流程
Yarn 的默认调度器、调度器分类、以及他们之间的区别
HDFS 存储多目录
Hadoop 参数调优
项目经验之基准测试
业务数据采集平台
电商业务流程
电商常识
SKU
SKU=StockKeepingUnit(库存量基本单位)。现在已经被引申为产品统一编号的简称, 每种产品均对应有唯一的 SKU 号。
SPU
SPU(StandardProductUnit):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合。
例如:iPhoneX 手机就是 SPU。一台银色、128G 内存的、支持联通网络的 iPhoneX, 就是 SKU。
电商业务表结构
环境搭建
MySQL安装
Sqoop安装
下载并解压
下载地址:http://mirrors.hust.edu.cn/apache/sqoop/1.4.6/
配置文件 sqoop-env.sh
export HADOOP_COMMON_HOME=/opt/module/hadoop-2.7.2
export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
export HIVE_HOME=/opt/module/hive export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
export ZOOCFGDIR=/opt/module/zookeeper-3.4.10/conf
export HBASE_HOME=/opt/module/hbase
export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
export HIVE_HOME=/opt/module/hive export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
export ZOOCFGDIR=/opt/module/zookeeper-3.4.10/conf
export HBASE_HOME=/opt/module/hbase
拷贝 JDBC 驱动
拷贝 jdbc 驱动到 sqoop 的 lib 目录下。
测试 Sqoop 是否能够成功连接数据库
bin/sqoop list-databases --connect jdbc:mysql://hadoop102:3306/ --username root --password 000000
hive安装
下载解压安装
修改配置文件
集成tez
Tez 是一个 Hive 的运行引擎,性能优于 MR。
用 Hive 直接编写 MR 程序,假设有四个有依赖关系的 MR 作业,
上图中,绿色是 Reduce Task,云状表示写屏蔽,需要将中间结果持久化写到 HDFS。
Tez 可以将多个有依赖的作业转换为一个作业,这样只需写一次 HDFS,且中间节点较
少,从而大大提升作业的计算性能。
上图中,绿色是 Reduce Task,云状表示写屏蔽,需要将中间结果持久化写到 HDFS。
Tez 可以将多个有依赖的作业转换为一个作业,这样只需写一次 HDFS,且中间节点较
少,从而大大提升作业的计算性能。
下载 tez 的依赖包:http://tez.apache.org
将 apache-tez-0.9.1-bin.tar.gz 上传到 HDFS 的/tez 目录下。
在 Hive 的/opt/module/hive/conf 下面创建一个 tez-site.xml 文件
在 hive-env.sh 文件中添加 tez 环境变量配置和依赖包环境变量配置
mv hive-env.sh.template hive-env.sh
在 hive-site.xml 文件中添加如下配置,更改 hive 计算引擎
测试
create table student( id int, name string);
insert into student values(1,"zhangsan");
select * from student;
没有报错则成功
没有报错则成功
注意事项
运行 Tez 时检查到用过多内存而被 NodeManager 杀死进程问题:
这种问题是从机上运行的 Container 试图使用过多的内存,而被 NodeManagerkill 掉了。
这种问题是从机上运行的 Container 试图使用过多的内存,而被 NodeManagerkill 掉了。
解决方法:
关掉虚拟内存检查,修改 yarn-site.xml,
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
分发,并重新启动 hadoop 集群。
业务数据同步策略
全量表:存储完整的数据
增量表:存储新增加的数据。
新增及变化表:存储新增加的数据和变化的数据。
特殊表:只需要存储一次。
业务数据采集同步脚本
注意:Hive 中的 Null 在底层是以“\N”来存储,而 MySQL 中的 Null 在底层就是 Null,为了 保证数据两端的一致性。在导出数据时采用--input-null-string 和--input-null-non-string 两个参 数。导入数据时采用--null-string 和--null-non-string。
数据仓库系统
数仓分层
ODS(原始数据层)
原始数据,保持数据不做处理
ods_表名
DWD(明细数据层)
对ODS层数据进行清洗(去除空值、脏数据、超过极限范围的数据)、维度退化、脱敏等
dwd_dim/fact_表名
DWS(服务数据层)
DWD为基础,按天进行轻度汇总
dws_表名
DWT(数据主题层)
DWS为基础,按主题进行汇总
dwt_购物车
ADS(数据应用层)
ADS层,为各种统计报表提供数据
ads_表名
数据集市与数据仓库
数据集市:微型的数据仓库,部门级
数据仓库,企业级
数仓理论
范式理论
定义
范式可以理解为设计一张数据表的表结构,符合的标准级别。 规范和要求
优点
关系型数据库设计时,遵照一定的规范要求,目的在于降低数据的冗余性。
缺点
范式的缺点是获取数据时,需要通过 Join 拼接出最后的数据。
分类
目前业界范式有:第一范式(1NF)、第二范式(2NF)、第三范式(3NF)、巴斯-科德范式 (BCNF)、第四范式(4NF)、第五范式(5NF)。
第一范式
属性不可切割
第二范式
不能存在部分函数依赖
第三范式
不能存在传递函数依赖
函数依赖
完全函数依赖
通过AB能得出C,但是AB单独得不出C,那么说C完全依赖于AB
部分函数依赖
通过AB能得出C,通过A也能得出C,或者通过B也能得出C,那么说C部分依赖于AB
传递函数依赖
通过A得到B,通过B得到C,但是C得不到A,那么说C传递依赖于A
关系建模与维度建模
联机事务处理 OLTP:OLTP 是传统的关系 型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易
1.每次查询只返回少量记录
2.随机、低延时写入用户的输入
3.用户,JavaEE 项目
4.最新数据状态
5.GB
2.随机、低延时写入用户的输入
3.用户,JavaEE 项目
4.最新数据状态
5.GB
联机分析处理 OLAP:OLAP 是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。
1.对大量记录进行汇总
2.批量导入
3.内部分析师,为决策提供支持
4.随时间变化的历史状态
5.TB 到 PB
2.批量导入
3.内部分析师,为决策提供支持
4.随时间变化的历史状态
5.TB 到 PB
关系建模
数据库建模,严格遵守三范式
关系模型虽然冗余少,但是在大规模数据,跨表分析统计查询过程中,会造成多表关
联,这会大大降低执行效率
关系模型虽然冗余少,但是在大规模数据,跨表分析统计查询过程中,会造成多表关
联,这会大大降低执行效率
维度建模
星型模型
雪花模型与星型模型的区别主要在于维度的层级,标准的星型模型维度只有一层,
而雪花模型可能会设计多级
而雪花模型可能会设计多级
雪花模型
雪花模型,比较靠近3NF,但是无法完全遵守,因为遵循3NF的性能成本太高
星座模型
星座模型与前两者情况的区别是事实表的数量,星座模型是基于多个事实表
模型选择
灵活选择,一般都是多个模型并存
维度表和事实表
维度表:一般是对事实的描述信息。每一张维表对应现实世界中的一个对象或者概念。
例如:用户、商品、日期、地区等。
例如:用户、商品、日期、地区等。
特征:
- 维表的范围很宽(具有多个属性、列比较多)
- 跟事实表相比,行数相对较小:通常<10 万条
- 内容相对固定:编码表
事实表:每行数据代表一个业务事件(下单、支付、退款、评价等)。“事实”这
个术语表示的是业务事件的度量值(可统计次数、个数、件数、金额等),例如,订单事
件中的下单金额。
每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键、通常具
有两个和两个以上的外键、外键之间表示维表之间多对多的关系。
个术语表示的是业务事件的度量值(可统计次数、个数、件数、金额等),例如,订单事
件中的下单金额。
每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键、通常具
有两个和两个以上的外键、外键之间表示维表之间多对多的关系。
特征:
- 非常的大
- 内容相对的窄:列数较少
- 经常发生变化,每天会新增加很多
事务型事实表:以每个事务或事件为单位,例如一个销售订单记录,一笔支付记录等,作为事实表里的
一行数据。一旦事务被提交,事实表数据被插入,数据就不再进行更改,其更新方式为增量
更新。
一行数据。一旦事务被提交,事实表数据被插入,数据就不再进行更改,其更新方式为增量
更新。
周期型快照事实表:周期型快照事实表中不会保留所有数据,只保留固定时间间隔的数据,例如每天或者
每月的销售额,或每月的账户余额等。
每月的销售额,或每月的账户余额等。
累积型快照事实表:累计快照事实表用于跟踪业务事实的变化。例如,数据仓库中可能需要累积或者存储
订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据来跟踪
订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新。
订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据来跟踪
订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新。
数据仓库建模
ODS 层
保持数据原貌不做任何修改,起到备份数据的作用。
数据采用压缩,减少磁盘存储空间(例如:原始数据 100G,可以压缩到 10G 左
右)
右)
创建分区表,防止后续的全表扫描
DWD 层
DWD 层需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。
维度建模一般按照以下四个步骤:
选择业务过程→声明粒度→确认维度→确认事实
选择业务过程→声明粒度→确认维度→确认事实
选择业务过程:
在业务系统中,挑选我们感兴趣的业务线,比如下单业务,支付业务,退款业务,物流
业务,一条业务线对应一张事实表。
在业务系统中,挑选我们感兴趣的业务线,比如下单业务,支付业务,退款业务,物流
业务,一条业务线对应一张事实表。
声明粒度:
数据粒度指数据仓库的数据中保存数据的细化程度或综合程度的级别。
声明粒度意味着精确定义事实表中的一行数据表示什么,应该尽可能选择最小粒度,以
此来应各种各样的需求。
典型的粒度声明如下:
订单中,每个商品项作为下单事实表中的一行,粒度为每次下单
每周的订单次数作为一行,粒度就是每周下单。
每月的订单次数作为一行,粒度就是每月下单
数据粒度指数据仓库的数据中保存数据的细化程度或综合程度的级别。
声明粒度意味着精确定义事实表中的一行数据表示什么,应该尽可能选择最小粒度,以
此来应各种各样的需求。
典型的粒度声明如下:
订单中,每个商品项作为下单事实表中的一行,粒度为每次下单
每周的订单次数作为一行,粒度就是每周下单。
每月的订单次数作为一行,粒度就是每月下单
确定维度:
维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。
维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。
确定事实:
此处的“事实”一词,指的是业务中的度量值,例如订单金额、下单次数等。
在 DWD 层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的
明细层事实表。事实表可做适当的宽表化处理。
此处的“事实”一词,指的是业务中的度量值,例如订单金额、下单次数等。
在 DWD 层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的
明细层事实表。事实表可做适当的宽表化处理。
DWS 层
统计各个主题对象的当天行为,服务于 DWT 层的主题宽表,以及一些业务明细数据,
应对特殊需求(例如,购买行为,统计商品复购率)。
应对特殊需求(例如,购买行为,统计商品复购率)。
DWT 层
以分析的主题对象为建模驱动,基于上层的应用和产品的指标需求,构建主题对象的全
量宽表。
量宽表。
ADS 层
对电商系统各大主题指标分别进行分析。
数仓搭建
ODS层
用户行为数据
ods_start_log(启动日志表)
ods_event_log(事件日志表)
加载数据脚本
业务数据
订单表(增量及更新)
订单详情表(增量)
SKU 商品表(全量)
用户表(增量及更新)
商品一级分类表(全量)
商品二级分类表(全量)
商品三级分类表(全量)
支付流水表(增量)
省份表(特殊)
地区表(特殊)
品牌表(全量)
订单状态表(增量)
SPU 商品表(全量)
商品评论表(增量)
退单表(增量)
加购表(全量)
商品收藏表(全量)
优惠券领用表(新增及变化)
优惠券表(全量)
活动表(全量)
活动订单关联表(增量)
优惠规则表(全量)
编码字典表(全量)
加载数据脚本
DWD层
步骤
- 对用户行为数据解析
- 对核心数据进行判空过滤。
- 对业务数据采用维度模型重新建模,即维度退化。
用户行为启动表数据解析
ods_start_log ——> dwd_start_log
创建启动表
加载数据脚本
用户行为事件表数据解析
创建事件日志基础明细表
UDF 函数特点:一行进一行出。简称,一进一出。
代码
UDTF 函数特点:多行进多行出。 简称,多进多出。
代码
数据解析脚本
商品点击表
建表语句
商品详情页表
建表语句
商品列表页表
建表语句
广告表
建表语句
消息通知表
建表语句
用户后台活跃表
建表语句
评论表
建表语句
收藏表
建表语句
点赞表
建表语句
错误日志表
建表语句
加载数据脚本
业务数据
商品维度表(全量表)
导入数据
优惠券信息表(全量)
导入数据
活动维度表(全量)
导入数据
地区维度表(特殊)
导入数据
时间维度表(特殊)(预留)
订单明细事实表(事务型快照事实表)
导入数据
支付事实表(事务型快照事实表)
导入数据
退款事实表(事务型快照事实表)
导入数据
评价事实表(事务型快照事实表)
导入数据
加购事实表(周期型快照事实表,每日快照)
- 由于购物车的数量是会发生变化,所以导增量不合适。
- 每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。
- 周期型快照事实表劣势:存储的数据量会比较大。
- 解决方案:周期型快照事实表存储的数据比较讲究时效性,时间太久了的意义不大,可以删除以前的数据。
导入数据
收藏事实表(周期型快照事实表,每日快照)
导入数据
优惠券领用事实表(累积型快照事实表)
- 优惠卷的生命周期:领取优惠卷-》用优惠卷下单-》优惠卷参与支付
- 累积型快照事实表使用:统计优惠卷领取次数、优惠卷下单次数、优惠卷参与支付次数
导入数据
订单事实表(累积型快照事实表)
导入数据
用户维度表(拉链表)
用户表中的数据每日既有可能新增,也有可能修改,但修改频率并不高,属于缓慢变化
维度,此处采用拉链表存储用户维度数据。
维度,此处采用拉链表存储用户维度数据。
什么是拉链表?
记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前
日期放入生效日期开始。
记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前
日期放入生效日期开始。
为什么要做拉链表?
拉链表适合于:数据会发生变化,但是大部分是不变的
比如:用户信息会发生变化,但是每天变化的比例不高。如果数据量有一定规模,按照每日全量的
方式保存效率很低。
拉链表适合于:数据会发生变化,但是大部分是不变的
比如:用户信息会发生变化,但是每天变化的比例不高。如果数据量有一定规模,按照每日全量的
方式保存效率很低。
拉链表制作流程
建表并初始化拉链表
建立临时表
导入数据
把临时表覆盖给拉链表
数据导入脚本
DWS 层
每日设备行为
导入数据
每日会员行为
导入数据
每日商品行为
导入数据
每日优惠券统计
导入数据
每日活动统计
导入数据
每日购买行为
导入数据
导入数据脚本
DWT层
设备主题宽表
导入数据
会员主题宽表
导入数据
商品主题宽表
导入数据
优惠券主题宽表
导入数据
活动主题宽表
导入数据
数据导入脚本
ADS层
设备主题
活跃设备数(日、周、月)
导入数据
每日新增设备
导入数据
沉默用户数
导入数据
本周回流用户数
导入数据
流失用户数
导入数据
留存率
导入数据
最近连续三周活跃用户数
导入数据
最近七天内连续三天活跃用户数
导入数据
会员主题
会员主题信息
导入数据
漏斗分析
导入数据
商品主题
商品个数信息
导入数据
商品销量排名
导入数据
商品收藏排名
导入数据
商品加入购物车排名
导入数据
商品退款率排名(最近 30 天)
导入数据
商品差评率
导入数据
营销主题(用户+商品+购买行为)
下单数目统计
导入数据
支付信息统计
导入数据
复购率
导入数据
数据导入脚本
技术框架
Azkaban 任务调度
Azkaban 的架构
- AzkabanWebServer:AzkabanWebServer 是整个 Azkaban 工作流系统的主要管理者,它用户登录认证、负责 project 管理、定时执行工作流、跟踪工作流执行进度等一系列任务。
- AzkabanExecutorServer:负责具体的工作流的提交、执行,它们通过 mysql 数据库来协调任务的执行。
- 关系型数据库(MySQL):存储大部分执行流状态,AzkabanWebServer 和 AzkabanExecutorServer 都需要访问数据库。
下载地址:http://azkaban.github.io/downloads.html
安装部署
准备
- azkaban-web-server-2.5.0.tar.gz
- azkaban-executor-server-2.5.0.tar.gz
- azkaban-sql-script-2.5.0.tar.gz
- mysql-libs.zip
安装
- 在/opt/module/目录下创建 azkaban 目录 mkdir azkaban
- 解 压 azkaban-web-server-2.5.0.tar.gz 、 azkaban-executor-server-2.5.0.tar.gz 、 azkaban-sql-script-2.5.0.tar.gz 到/opt/module/azkaban 目录下
- 对解压后的文件重新命名 mv azkaban-web-2.5.0/ server mv azkaban-executor-2.5.0/ executor
- azkaban 脚本导入 source/opt/module/azkaban/azkaban-2.5.0/create-all-sql-2.5.0.sql
生成密钥库
Keytool 是 java 数据证书的管理工具,使用户能够管理自己的公/私钥对及相关证书。
-keystore 指定密钥库的名称及位置(产生的各类信息将不在.keystore 文件中)
-genkey 在用户主目录中创建一个默认文件".keystore"
-alias 对我们生成的.keystore 进行指认别名;如果没有默认是 mykey
-keyalg 指定密钥的算法 RSA/DSA 默认是 DSA
-keystore 指定密钥库的名称及位置(产生的各类信息将不在.keystore 文件中)
-genkey 在用户主目录中创建一个默认文件".keystore"
-alias 对我们生成的.keystore 进行指认别名;如果没有默认是 mykey
-keyalg 指定密钥的算法 RSA/DSA 默认是 DSA
- 生成 keystore 的密码及相应信息的密钥库 keytool -keystore keystore -alias jetty -genkey -keyalg RSA
- 将 keystore 拷贝到 azkabanweb 服务器根目录中 mv keystore /opt/module/azkaban/server/
配置文件
Web 服务器配置
进入 azkabanweb 服务器安装目录 conf 目录,打开 azkaban.properties 文件 /opt/module/azkaban/server/conf
vim azkaban.properties
在 azkabanweb 服务器安装目录 conf 目录,按照如下配置修改 azkaban-users.xml 文件,增加管理员用户。
vim azkaban-users.xml
执行服务器配置
进入执行服务器安装目录 conf,打开 azkaban.properties /opt/module/azkaban/executor/conf
vim azkaban.properties
启动
启动 executor服务器
bin/azkaban-executor-start.sh
启动 web服务器
bin/azkaban-web-start.sh
注意:
先执行 executor,再执行 web,避免 WebServer 会因为找不到执行器启动失败。
先执行 executor,再执行 web,避免 WebServer 会因为找不到执行器启动失败。
https://服务器 IP 地址:8443
创建执行任务
建库建表
编写 Sqoop 导出脚本
vim hdfs_to_mysql.sh
会员主题指标获取的全调度流程(顺序执行)
mysql_to_hdfs.job
hdfs_to_ods_log.job
hdfs_to_ods_db.job
ods_to_dwd_start_log.job
ods_to_dwd_db.job
dwd_to_dws.job
dws_to_dwt.job
dwt_to_ads.job
hdfs_to_mysql.job
superset可视化
安装python3
配置 python3.6环境
0 条评论
下一页