大数据安全技术栈树状图
2021-10-30 12:08:18 0 举报
针对大数据安全分析平台研发架构师归纳总结,后期会持续迭代技术栈
作者其他创作
大纲/内容
信息安全技术
企业大数据安全攻防知识库
web安全基础
理解跨站请求伪造原理
SQL注入攻击原理
文件上传攻击原理等
工具:burpsuite、SQLMAP
熟悉http报文数据格式
熟悉网络抓包测试工具的使用深入理解数据包构造
工具一:wirshark
工具二:Brim
常用的查询语法"!", "!=", "!~", "(", "<", "<=", "=", "=~", ">", ">=", "and", "in", "not", "or", "|", [\n\r
], comment, or whitespace but "-" found.
], comment, or whitespace but "-" found.
了解AST 语法树知识不开发态势感知DSL语言无需深入了解原理但是职业方向是这个方向的必须要懂
入侵开源检测框架:zeek
熟悉Lua脚本语言开发技术
熟悉正则表达式
.匹配任何字符
^行首
$行尾
\转义特殊字符(*?+[](){}^$| \\./)
\w字
\t水平制表
\n换行
\d任意数字
\s空格字符(\t\n\f\r\p{Z})
[…]匹配括号内的任何字符(或字符范围)。范围可以是[a-z]、[a-z]、[3…5]等。
\D不是十进制数字的任何字符
\S非空白字符
\W非文字字符
^行首
$行尾
\转义特殊字符(*?+[](){}^$| \\./)
\w字
\t水平制表
\n换行
\d任意数字
\s空格字符(\t\n\f\r\p{Z})
[…]匹配括号内的任何字符(或字符范围)。范围可以是[a-z]、[a-z]、[3…5]等。
\D不是十进制数字的任何字符
\S非空白字符
\W非文字字符
了解恶意软件的工作流程以及原理并熟悉操作系统安全日志触发告警的核心原理
熟悉威胁情报的使用以及威胁情报的情报数据规范
MISP基金会威胁情报数据标准
微步在线威胁情报社区API接口使用
OTX威胁情报社区API接口使用
熟悉安全厂商各类产品包括日志字段构成以及类型
MITRE ATT&CK矩阵
每个攻击阶段都有对应的编号该编号为大数据安全平台建设提供数据打标服务
大数据技术
数据仓库知识
数仓技术
Nosql数据库
Clickhouse
入门
简介
俄罗斯开源、C++编写、面向OLAP、能使用SQL实时查询的列式存储数据库
本文大体上基于20210730左右版本整理
特点
列式存储
对于列的聚合,计数,求和等统计操作原因优于行式存储。
格式相同,便于压缩
支持DBMS功能
支持大部分标准SQL
多样化存储引擎
不同表按需设置存储引擎
高吞吐能力
采用类似LSM Tree,append写入,定期合并
数据分区与线程并行
数据划分多个partition
partition进一步划分多个index granularity(索引粒度)
多个cpu分别处理其中一部分
优点
单个Query就可以使用所有cpu,极致执行效率
缺点
不利于qps高的查询,即不适于并行查询
性能
单表查询快于多表查询
安装
关键目录(默认)
子主题
数据类型
整型
固定长度的整型,包括有符号整型或无符号整型。
整型范围(-2n-1~2n-1-1):
Int8 - [-128 : 127]
Int16 - [-32768 : 32767]
Int32 - [-2147483648 : 2147483647]
Int64 - [-9223372036854775808 : 9223372036854775807]
无符号整型范围(0~2n-1):
UInt8 - [0 : 255]
UInt16 - [0 : 65535]
UInt32 - [0 : 4294967295]
UInt64 - [0 : 18446744073709551615]
使用场景: 个数、数量、也可以存储型 id。
整型范围(-2n-1~2n-1-1):
Int8 - [-128 : 127]
Int16 - [-32768 : 32767]
Int32 - [-2147483648 : 2147483647]
Int64 - [-9223372036854775808 : 9223372036854775807]
无符号整型范围(0~2n-1):
UInt8 - [0 : 255]
UInt16 - [0 : 65535]
UInt32 - [0 : 4294967295]
UInt64 - [0 : 18446744073709551615]
使用场景: 个数、数量、也可以存储型 id。
浮点型
Float32 - float
Float64 – double
建议尽可能以整数形式存储数据。
例如,将固定精度的数字转换为整数值,
如时间用毫秒为单位表示,因为浮点型进行计算时可能引起四舍五入的误差。
Float64 – double
建议尽可能以整数形式存储数据。
例如,将固定精度的数字转换为整数值,
如时间用毫秒为单位表示,因为浮点型进行计算时可能引起四舍五入的误差。
布尔型
没有单独的类型来存储布尔值。
可以使用 UInt8 类型,取值限制为 0 或 1。
可以使用 UInt8 类型,取值限制为 0 或 1。
Decimal 型
有符号的浮点数,可在加、减和乘法运算过程中保持精度。
对于除法,最低有效数字会
被丢弃(不舍入)。
有三种声明:
➢ Decimal32(s),相当于 Decimal(9-s,s),有效位数为 1~9
➢ Decimal64(s),相当于 Decimal(18-s,s),有效位数为 1~18
➢ Decimal128(s),相当于 Decimal(38-s,s),有效位数为 1~38
s 标识小数位
使用场景: 一般金额字段、汇率、利率等字段为了保证小数点精度,都使用 Decimal
进行存储。
对于除法,最低有效数字会
被丢弃(不舍入)。
有三种声明:
➢ Decimal32(s),相当于 Decimal(9-s,s),有效位数为 1~9
➢ Decimal64(s),相当于 Decimal(18-s,s),有效位数为 1~18
➢ Decimal128(s),相当于 Decimal(38-s,s),有效位数为 1~38
s 标识小数位
使用场景: 一般金额字段、汇率、利率等字段为了保证小数点精度,都使用 Decimal
进行存储。
字符串
1)String
字符串可以任意长度的。它可以包含任意的字节集,包含空字节。
2)FixedString(N)
固定长度 N 的字符串,N 必须是严格的正自然数。
当服务端读取长度小于 N 的字符串时候,通过在字符串末尾添加空字节来达到 N 字节长度。
当服务端读取长度大于 N 的
字符串时候,将返回错误消息。
字符串可以任意长度的。它可以包含任意的字节集,包含空字节。
2)FixedString(N)
固定长度 N 的字符串,N 必须是严格的正自然数。
当服务端读取长度小于 N 的字符串时候,通过在字符串末尾添加空字节来达到 N 字节长度。
当服务端读取长度大于 N 的
字符串时候,将返回错误消息。
枚举类型
包括 Enum8 和 Enum16 类型。
Enum 保存 'string'= integer 的对应关系。
Enum8 用 'String'= Int8 对描述。
Enum16 用 'String'= Int16 对描述。
Enum 保存 'string'= integer 的对应关系。
Enum8 用 'String'= Int8 对描述。
Enum16 用 'String'= Int16 对描述。
时间类型
目前 ClickHouse 有三种时间类型
Date 接受年-月-日的字符串比如 ‘2019-12-16’
Datetime 接受年-月-日 时:分:秒的字符串比如 ‘2019-12-16 20:50:10’
Datetime64 接受年-月-日 时:分:秒.亚秒的字符串比如‘2019-12-16 20:50:10.66’
日期类型,用两个字节存储,表示从 1970-01-01 (无符号) 到当前的日期值。
Date 接受年-月-日的字符串比如 ‘2019-12-16’
Datetime 接受年-月-日 时:分:秒的字符串比如 ‘2019-12-16 20:50:10’
Datetime64 接受年-月-日 时:分:秒.亚秒的字符串比如‘2019-12-16 20:50:10.66’
日期类型,用两个字节存储,表示从 1970-01-01 (无符号) 到当前的日期值。
数组
Array(T):由 T 类型元素组成的数组。
T 可以是任意类型,包含数组类型。
但不推荐使用多维数组,ClickHouse 对多维数组的支持有限。
例如,不能在 MergeTree 表中存储多维数组。
T 可以是任意类型,包含数组类型。
但不推荐使用多维数组,ClickHouse 对多维数组的支持有限。
例如,不能在 MergeTree 表中存储多维数组。
表引擎
表引擎决定表的存储方式
数据存储的方式和位置
支持哪些查询以及如何支持
是否支持并发访问
索引的使用
是否支持多线程
数据复制的参数
主要类别
TinyLog
以列文件的形式保存在磁盘上,
不支持索引,
没有并发控制。
一般保存少量数据的小表,
生产环境上作用有限。
可以用于平时练习测试用。
不支持索引,
没有并发控制。
一般保存少量数据的小表,
生产环境上作用有限。
可以用于平时练习测试用。
Memory
内存引擎,数据以未压缩的原始形式直接保存在内存当中,服务器重启数据就会消失。
读写操作不会相互阻塞,不支持索引。
简单查询下有非常非常高的性能表现(超过 10G/s)。
简单查询下有非常非常高的性能表现(超过 10G/s)。
读写操作不会相互阻塞,不支持索引。简单查询下有非常非常高的性能表现(超过 10G/s)。
MergeTree/*MergeTree
最强大的表引擎
支持索引和分区,地位可以相当于 innodb 之于 Mysql
三个关键参数
partition by 分区(可选)
是降低扫描的范围,优化查询速度,不填只有一个分区
分区目录:MergeTree 是以列文件+索引文件+表定义文件组成的
在设置分区后这些文件会保存到不同的分区目录中
在设置分区后这些文件会保存到不同的分区目录中
并行:对于跨分区的统计,ClickHouse以分区为单位并行处理
数据写入与分区合并
任何一个批次的数据写入都会产生一个临时分区,不会纳入任何一个已有的分区。
写入后的某个时刻(大概 10-15 分钟后),
ClickHouse 会自动执行合并操作(可以手动通过 optimize 执行),
把临时分区的数据,合并到已有分区中
ClickHouse 会自动执行合并操作(可以手动通过 optimize 执行),
把临时分区的数据,合并到已有分区中
primary key 主键(可选)
提供了数据的一级索引,但是却不是唯一约束。
即可以存在相同 primary key
即可以存在相同 primary key
设定的主要依据是查询的where条件,定位到对应的 index granularity,避免全表扫描
order by(必选)
设定了分区内的数据按照哪些字段顺序进行有序保存。
MergeTree 中唯一一个必填项,
不设置主键的情况,很多处理会依照 order by 的字段进行处理(比如去重,汇总)
不设置主键的情况,很多处理会依照 order by 的字段进行处理(比如去重,汇总)
要求:主键必须是 order by 字段的前缀字段。
比如 order by 字段是 (id,sku_id) 那么主键必须是 id 或者(id,sku_id)
比如 order by 字段是 (id,sku_id) 那么主键必须是 id 或者(id,sku_id)
其他参数
二级索引
20.1.2.4 之前是被标注为实验性的,后续版本默认开启
其中 GRANULARITY N 是设定二级索引对于一级索引粒度的粒度。
数据 TTL
TTL 即 Time To Live,MergeTree 提供了可以管理数据表或者列的生命周期的功能。
级别
列级别 TTL
表级别 TTL
子主题
支持的周期
SECOND
MINUTE
HOUR
DAY
WEEK
MONTH
QUARTER
YEAR
注意
涉及判断的字段必须是 Date 或者 Datetime 类型,推荐使用分区的日期字段。
ReplacingMergeTree
概览
ReplacingMergeTree 是 MergeTree 的一个变种,存储特性继承 MergeTree,具有去重的功能。
可以借助ReplacingMergeTree去除重复数据
去重的时机
的去重只会在合并的过程中出现,(新版本在插入过程中会先对插入部分去重)
合并会在未知的时间在后台进行,无法预先作出计划。有一些数据可能仍未被处理。
去重范围
分区内去重,不能执跨分区去重。
order by 字段作为唯一键去重
适用范围
ReplacingMergeTree 能力有限,
ReplacingMergeTree 适用于在后台清除重复的数据以节省空间,但是它不保证没有重复的数据出现。
ReplacingMergeTree 适用于在后台清除重复的数据以节省空间,但是它不保证没有重复的数据出现。
保留字段
ReplacingMergeTree() 填入的参数为版本字段,重复数据保留版本字段值最大的。
(版本一般为时间字段)
(版本一般为时间字段)
如果不填版本字段,默认按照插入顺序保留最后一条。
SummingMergeTree
概览
一种能够“预聚合”的引擎
适用场景
不查询明细,只关心以维度进行汇总聚合结果的场景。
相对普通的MergeTree节省存储空间,提高查询效率
聚合时机
在同一批次插入(新版本)或分片合并时才会进行聚合
聚合范围
同分区数据聚合
order by 的列为key,作为维度列,聚合其他列
若未指定聚合其他的列按插入顺序保留第一行
如果没有指定聚合的列,所有非维度且为数字的列均会被聚合
注意
由于聚合的时机可能会包含一些还没来得及聚合的临时明细
我们无法直接获取汇总值,在使用的时候还是需要sum()
我们无法直接获取汇总值,在使用的时候还是需要sum()
SQL操作
概述
传统关系型数据库(以 MySQL 为例)的 SQL 语句,ClickHouse 基本都支持
此处只会着重介绍不一致的地方
此处只会着重介绍不一致的地方
Insert
基本与标准 SQL(MySQL)基本一致
Update/Delete
注意
ClickHouse 提供了 Delete 和 Update 的能力,这类操作被称为 Mutation 查询,它可以看做 Alter 的一种
随可以实现修改和删除,但和一般的 OLTP 数据库不一样,Mutation 语句是一种很“重”的操作,而且不支持事务。
“重”的原因主要是每次修改/者删除都会导致放弃目标数据的原有分区,重建新分区。
所以在必须删改的情况下尽量做批量的变更,不要进行频繁小数据的操作。
所以在必须删改的情况下尽量做批量的变更,不要进行频繁小数据的操作。
删改操作
Select
概述
ClickHouse 基本上与标准 SQL 差别不大
支持操作
支持子查询
支持 CTE(Common Table Expression 公用表表达式 with 子句)
支持各种 JOIN,但是 JOIN 操作无法使用缓存,所以即使是两次相同的 JOIN 语句,ClickHouse 也会视为两条新 SQL
GROUP BY 操作增加了 with rollup\with cube\with total 用来计算小计和总计。
with rollup
从右至左去掉维度进行小计
with cube
从右至左去掉维度进行小计,再从左至右去掉维度进行小计
with totals
只计算合计
窗口函数(官方正在测试中...)
不支持自定义函数
Alter操作
与Mysql修改字段基本一致
操作
新增字段
修改字段类型
删除字段
导出数据
副本
概述
副本的目的主要是保障数据的高可用性,即使一台 ClickHouse 节点宕机,那么也可以从其他服务器获得相同的数据。
副本写入流程
client写入数据到clickhouse-a
clickhouse-a向kafka 集群提交写入日志
clickhouse-b从kafka集群收到写入日志
clickhouse-b从目标副本(clickhouse-a)下载新的数据
配置步骤
官网
使用注意事项
副本只能同步数据,不能同步表结构,我们需要在每台有备份的服务器上手动建表
建表语句
参数说明
第一个参数
分片的 zk_path
一般按照:/clickhouse/table/{shard}/{table_name} 的格式
如果只有一个分片就写 01 即可
第二个参数
副本名称
相同的分片副本名称不能相同。
局限
- 副本虽然能够提高数据的可用性,降低丢失风险。
但是每台服务器实际上必须容纳全量数据,对数据的横向扩容没有解决。
分片集群
概述
解决数据水平切分的问题,需要引入分片的概念。
通过分片把一份完整的数据进行切分,不同的分片分布到不同的节点上,再通过 Distributed 表引擎把数据拼接起来一同使用。
Distributed 表引擎本身不存储数据,有点类似于 MyCat 之于 MySql,成为一种中间件,
通过分布式逻辑表来写入、分发、路由来操作多台节点不同分片的分布式数据。
通过分布式逻辑表来写入、分发、路由来操作多台节点不同分片的分布式数据。
集群写入流程
客户端向Distributed 表发起写请求((3 分片 2 副本共 6 个节点)
internal_replication
true
优化方案
Distributed 将数据写入分片的一个副本
其他的副本从该副本拉取数据
Distributed 将数据写入分片的一个副本 一共需要写三个分片各一个副本 ,一共三个副本数据
false
Distributed 将数据写入所有分片的所有副本
总共需要写三分片,每个分片两副本,一共六个副本数据
集群读取流程
客户端向Distributed 表发起读请求((3 分片 2 副本共 6 个节点)
errors_count
优先选择errors_count小的副本
errors_count相同根据随机、顺序、随机(优先第一顺位)、Host相近等方式选择
配置方法
见官网
注意
ClickHouse 的集群是表级别的,实际企业中,大部分做了高可用,但是没有用分片,避免降低查询性能以及操作集群的复杂性。
进阶
Explain查看执行计划
基本语法
EXPLAIN [AST | SYNTAX | PLAN | PIPELINE] [setting = value, ...] SELECT ... [FORMAT ...]
参数含义
PLAN
用于查看执行计划,默认值
额外参数
header
打印计划中各个步骤的 head 说明,默认关闭,默认值 0;
description
打印计划中各个步骤的描述,默认开启,默认值 1;
actions
打印计划中各个步骤的详细信息,默认关闭,默认值 0。
AST
用于查看语法树
SYNTAX
用于优化语法
PIPELINE
用于查看 PIPELINE 计划
额外参数
header
打印计划中各个步骤的 head 说明,默认关闭,默认值 0;
graph
用 DOT 图形语言描述管道图,默认关闭,需要查看相关的图形需要配合graphviz 查看;
actions
如果开启了 graph,紧凑打印打,默认开启
案例
PLAN
explain plan select arrayJoin([1,2,3,null,null]);
explain header=1, actions=1,description=1 SELECT number from system.numbers limit 10;
AST
EXPLAIN AST SELECT number from system.numbers limit 10;
SYNTAX 语法优化
EXPLAIN SYNTAX SELECT number = 1 ? 'hello' : (number = 2 ? 'world' : 'atguigu') FROM numbers(10);
优化后:SELECT multiIf(number = 1, 'hello', number = 2, 'world', 'xyz')FROM numbers(10)
PIPELINE
EXPLAIN PIPELINE SELECT sum(number) FROM numbers_mt(100000) GROUP BY number % 20;
建表优化
数据类型
时间子段的类型
能用数值或日期类型表示的字段就不用字符串
ClickHouse 底层将 DateTime 存储为时间戳 Long 类型,但不建议存储 Long 类型
DateTime 不需要经过函数转换处理,执行效率高、可读性好。
DateTime 不需要经过函数转换处理,执行效率高、可读性好。
空值存储类型
官方:Nullable 类型几乎总是会拖累性能
因为存储 Nullable 列时需要创建一个额外的文件来存储 NULL 的标记,
并且 Nullable 列无法被索引。
因为存储 Nullable 列时需要创建一个额外的文件来存储 NULL 的标记,
并且 Nullable 列无法被索引。
此除非极特殊情况,应直接使用字段默认值表示空,
或者自行指定一个在业务中无意义的值
或者自行指定一个在业务中无意义的值
分区/索引
分区
根据业务特点,不宜过粗或过细,一般按天分区,可以使用 Tuple()分区
索引
clickhouse的order by字段为索引列,必须要指定
一般是查询中充当筛选条件的列
可以是单一维度,也可以是组合维度,一般是高级、查询频率高的在前
一般基数特别大的不适合作为索引列
表参数
Index_granularity
控制索引粒度,默认是 8192,如非必须不建议调整
TTL
如果不需要保留所有历史数据,可是设置数据失效时间
免去手动过期历史数据的麻烦
写入/删除优化
尽量不要执行单条或小批量删除和插入操作
会产生小分区文件,给后台Merge 任务带来巨大压力
会产生小分区文件,给后台Merge 任务带来巨大压力
不要一次写入太多分区,或数据写入太快,数据写入太快会导致 Merge 速度跟不上而报错,
建议每秒钟发起 2-3 次写入操作,每次操作写入 2w~5w 条数据(依服务器性能而定)
建议每秒钟发起 2-3 次写入操作,每次操作写入 2w~5w 条数据(依服务器性能而定)
常见配置
配置文件
config.xml
users.xml
配置说明
cpu资源
background_pool_size
后台线程池的大小,merge 线程就是在该线程池中执行,该线程池不仅仅是给 merge 线程用的,
默认值 16,允许的前提下建议改成 cpu 个数的 2 倍(线程数)。
background_schedule_pool_size
执行后台任务(复制表、Kafka 流、DNS 缓存更新)的线程数。
默 认 128,建议改成 cpu 个数的 2 倍(线程数)。
background_distributed_schedule_pool_size
设置为分布式发送执行后台任务的线程数,默认 16
建议改成 cpu个数的 2 倍(线程数)。
max_concurrent_queries
最大并发处理的请求数(包含 select,insert 等)
建议改成 cpu个数的 2 倍(线程数)。
max_threads
设置单个查询所能使用的最大 cpu 个数
默认是 cpu 核数
内存资源
max_memory_usage
此参数在 users.xml 中,表示单次 Query 占用内存最大值
可以设置的比较大,这样可以提升集群查询的上限。
保留一点给 OS,比如 128G 内存的机器,设置为 100GB。
max_bytes_before_external_group_by
当 group 使用内存超过阈值后会刷新到磁盘进行。
一般按照 max_memory_usage 的一半设置内存
clickhouse 聚合分两个阶段:查询并及建立中间数据、合并中间数据,结合上一项,建议 50GB。
max_bytes_before_external_sort
当 order by 已使用 max_bytes_before_external_sort 内存就进行溢写磁盘(基于磁盘排序)
如果不设置该值,那么当内存不够时直接抛错,设置了该值 order by 可以正常完成,
但是速度相对存内存来说肯定要慢点(实测慢的非常多,无法接受)。
但是速度相对存内存来说肯定要慢点(实测慢的非常多,无法接受)。
max_table_size_to_drop
此参数在 config.xml 中,表示删除超过该大小的分区表会失败
默认50G,建议修改为0 ,即可以删除任意大小的分区表
存储
多目录
ClickHouse 不支持设置多数据目录
但是,可以挂在虚拟券组,一个劝阻绑定多块物理磁盘提升数据io性能
多数查询场景SSD会比普通机械硬盘快2-3倍
语法优化规则
概述
clickhouse的SQL优化是基于RBO(Rule Based Optimization)
COUNT优化
使用count() 或者 count(*),且没有 where 条件时,直接使用system.tables 的 total_rows
EXPLAIN SELECT count() FROM datasets.hits_v1;
消除子查询重复
子查询中有两个重复的字段,会被去重(即使是重命名成其他字段)
注意关键词:子查询、重复字段
谓词下推
当 group by 有 having 子句,但是没有 with cube、with rollup 或者 with totals 修饰的时候,
having 过滤会下推到 where 提前过滤。
having 过滤会下推到 where 提前过滤。
EXPLAIN SYNTAX SELECT UserID FROM hits_v1 GROUP BY UserID HAVING UserID = '8585742290196126178';
子查询也支持谓词下推
聚合计算外推
聚合函数内的计算外推
EXPLAIN SYNTAX SELECT sum(UserID * 2) FROM visits_v1
聚合函数消除
对聚合键,group by key 使用 min、max、any 聚合函数,则将函数消除
EXPLAIN SYNTAX SELECT sum(UserID * 2), max(VisitID),max(UserID) FROM visits_v1 GROUP BY UserID
删除重复的orderby key
去重重复的聚合键
EXPLAIN SYNTAX
SELECT * FROM visits_v1 ORDER BY UserID ASC,UserID ASC,VisitID ASC,VisitID ASC
SELECT * FROM visits_v1 ORDER BY UserID ASC,UserID ASC,VisitID ASC,VisitID ASC
删除重复的limit by key
EXPLAIN SYNTAX
SELECT CodeVersion FROM visits_v1 LIMIT 3 BY VisitID, VisitID LIMIT 10
SELECT CodeVersion FROM visits_v1 LIMIT 3 BY VisitID, VisitID LIMIT 10
删除重复的 USING Key
一般人不会犯的错误,犯了就转行吧
标量替换
如果子查询只返回一行数据,在被引用的时候用标量替换
EXPLAIN SYNTAX
WITH
(SELECT sum(bytes)
FROM system.parts
WHERE active) AS total_disk_usage
SELECT
(sum(bytes) / total_disk_usage) * 100 AS table_disk_usage,table
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10;
WITH
(SELECT sum(bytes)
FROM system.parts
WHERE active) AS total_disk_usage
SELECT
(sum(bytes) / total_disk_usage) * 100 AS table_disk_usage,table
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10;
三元运算优化
开启了 optimize_if_chain_to_multiif 参数,三元运算符会被替换成 multiIf 函数
EXPLAIN SYNTAX
SELECT number = 1 ? 'hello' : (number = 2 ? 'world' : 'atguigu')
FROM numbers(10)
settings optimize_if_chain_to_multiif = 1;
SELECT number = 1 ? 'hello' : (number = 2 ? 'world' : 'atguigu')
FROM numbers(10)
settings optimize_if_chain_to_multiif = 1;
查询优化
单表查询
查询优化
Prewhere 替代 where
Prewhere 和 where 语句的作用相同,都是用来过滤数据
不同之处
prewhere 只支持*MergeTree 族系列引擎的表
首先会读取指定的列数据,来判断数据过滤,等待数据过滤之后再读取 select 声明的列字段来补全其余属性。
查询列明显多于筛选列时使用 Prewhere 可十倍提升查询性能,
Prewhere 会自动优化执行过滤阶段的数据读取方式,降低 io 操作。
Prewhere 会自动优化执行过滤阶段的数据读取方式,降低 io 操作。
默认情况下我们会开启prewhere 并且 会自动优化成prewhere
但是在某些情况下需要我们手动指定prewhere
使用常量表达式
使用默认值为 alias 类型的字段
包含了 arrayJOIN,globalIn,globalNotIn 或者 indexHint 的查询
select 查询的列字段和 where 的谓词相同
使用了主键字段
数据采样
通过采样运算可极大提升数据分析的性能
采样修饰符只有在 MergeTree engine 表中才有效,且在创建表时需要指定采样策略。
列裁剪与分区裁剪
选择需要的分区、列减少io
orderby 结合 where、limit
减少数据量
避免构建虚拟列
虚拟列非常消耗资源浪费性能
可以考虑在前端进行处理,或者在表中构造实际字段进行额外存储。
uniqCombined 替代 distinct
uniqCombined 底层采用类似 HyperLogLog 算法实现,性能可提升 10 倍以上
会有2%左右的误差,在允许的情况下可以使用这个
而Count(distinct )会使用 uniqExact精确去重。
物化视图
后面介绍
其他
查询熔断
为单个查询设置超时时间
配置周期熔断,在一个周期内,限制用户查询次数
关闭虚拟内存
物理内存和虚拟内存的数据交换,会导致查询变慢,资源允许的情况下关闭虚拟内存。
配置 join_use_nulls
为每一个账户添加 join_use_nulls 配置,左表中的一条记录在右表中不存在,
右表的相应字段会返回该字段相应数据类型的默认值,而不是标准 SQL 中的 Null 值。
右表的相应字段会返回该字段相应数据类型的默认值,而不是标准 SQL 中的 Null 值。
批量写入时先排序
无序的数据或者涉及的分区太多,会导致 ClickHouse 无法及时对新导入的数据进行合并,从而影响查询性能。
关注 CPU变化
在 50%左右会出现查询波动,达到 70%会出现大范围的查询超时
多表关联
用in代替on
当多表联查时,查询的数据仅从其中一张表出时,可考虑用 IN 操作而不是 JOIN
大小表关联
要满足小表在右的原则
右表关联时被加载到内存中与左表进行比较
ClickHouse 中无论是 Left join 、Right join 还是 Inner join
永远都是拿着右表中的每一条记录到左表中查找该记录是否存在,所以右表必须是小表。
永远都是拿着右表中的每一条记录到左表中查找该记录是否存在,所以右表必须是小表。
谓词下推
旧版本不会对join进行谓词下推,所以每个子查询应先过滤
新版本会优化
分布式表使用GLOBAL
两张分布式表上的 IN 和 JOIN 之前必须加上 GLOBAL 关键字,
右表只会在接收查询请求的那个节点查询一次,并将其分发到其他节点上
右表只会在接收查询请求的那个节点查询一次,并将其分发到其他节点上
果不加 GLOBAL 关键字的话,每个节点都会单独发起一次对右表的查询,
而右表又是分布式表,就导致右表一共会被查询 N²次
而右表又是分布式表,就导致右表一共会被查询 N²次
使用字典表
用字典替换join操作
字典常驻内存,所以字典表不宜太大
提前过滤
减少数据扫描
数据一致性
概述
官方:对数据一致性支持最好的Mergetree也仅仅支持最终一致性
原因
数据的去重只会在分区批量写入或数据合并的时候进行,但是后台合并时间不确定
我们无法针对此现象做处理,导致部分数据存在重复
当然我们可以用OPTIMIZE手动出发合并,但是会触发大量数据读写,不建议
对于ReplacingMergeTree、SummingMergeTree 此类聚合表引擎,会出现短暂数据不一致的情况
解决办法
手动 OPTIMIZE
在写入数据后,立刻执行 OPTIMIZE 强制触发新写入分区的合并动作。
触发大量数据读写,不建议
语法
OPTIMIZE TABLE [db.]name [ON CLUSTER cluster] [PARTITION partition |
PARTITION ID 'partition_id'] [FINAL] [DEDUPLICATE [BY expression]]
PARTITION ID 'partition_id'] [FINAL] [DEDUPLICATE [BY expression]]
通过 Group by 去重
我们可以增加一个deleted标记数据是否删除
比如 0 代表未删除,1 代表删除数据。
比如 0 代表未删除,1 代表删除数据。
argMax(field1,field2):按照 field2 的最大值取 field1 的值。
数据并没有被真正的删除,而是被过滤掉了。
在一些合适的场景下,可以结合 表级别的 TTL 最终将物理数据删除。
在一些合适的场景下,可以结合 表级别的 TTL 最终将物理数据删除。
通过 FINAL 查询
查询语句后加FINAL修饰符会执行 Merge 的特殊逻辑(例如数据去重,预聚合等)。
早期版本此修饰符会将查询变成单线程,查询速度慢
新版本(20.5.2.7-stable+)支持多线程,可通过max_final_threads
参数进行控制单个查询线程数量,但是目前读取part部分依旧串行
参数进行控制单个查询线程数量,但是目前读取part部分依旧串行
物化视图
概述
ClickHouse 的物化视图是一种查询结果的持久化,用户查起来跟表没有区别
创建的过程它是用了一个特殊引擎,加上后来 as select,就是 create 一个 table as select 的写法。
物化视图不会随着基础表的变化而变化,所以它也称为快照(snapshot)
物化视图与普通视图的差别
普通视图
普通视图不保存数据,保存的仅仅是查询语句
查询的时候还是从原表读取数据
可以理解为一个子查询
物化视图
物化视图则是把查询的结果根据相应的引擎存入到了磁盘或内存中
对数据重新进行了组织,可以堪称是一张新表
优缺点
优点
查询速度快
因为都计算好了,数据量少
缺点
本质是流式数据的使用场景,累加式的技术,所以需要用历史数据进行驱虫、去核
而如果一张表加了很多物化视图,在写入表的时候视图也会相应的写入,会消耗更多的机器资源,如带宽沾满、存储增加
基本语法
create 语法,会创建一个隐藏的目标表来保存视图数据。
也可以 TO 表名,保存到一张显式的表。
没有加 TO 表名,表名默认就是 .inner.物化视图名
CREATE [MATERIALIZED] VIEW [IF NOT EXISTS] [db.]table_name [TO[db.]name]
[ENGINE = engine] [POPULATE] AS SELECT ...
[ENGINE = engine] [POPULATE] AS SELECT ...
限制
必须指定物化视图的 engine 用于数据存储
TO [db].[table]语法的时候,不得使用 POPULATE。
POPULATE 关键字决定了物化视图的更新策略:
查询语句(select)可以包含下面的子句: DISTINCT, GROUP BY, ORDER BY, LIMIT…
物化视图的 alter 操作有些限制,操作起来不大方便
若物化视图的定义使用了 TO [db.]name 子语句,则可以将目标表的视图 卸载DETACH 再装载 ATTACH
物化视图的更新
物化视图创建好之后,若源表被写入新数据则物化视图也会同步更新
POPULATE决定了物化视图的更新策略
若有POPULATE 则在创建视图的过程会将源表已经存在的数据一并导入,类似于 create table ... as
若无POPULATE 则物化视图在创建之后没有数据
ClickHouse 官方并不推荐使用populated,因为在创建视图过程中插入表中的数据并不会写入视图,会造成数据的丢失。
物化视图不支持同步删除
源表删除视图数据还在
物化视图是一种特殊的数据表,可以用 show tables 查看,可以删除视图数据、可以删除视图
MaterializeMySQL 引擎
概述
ClickHouse 20.8.2.3 版本新增加了 MaterializeMySQL 的 database 引擎
该引擎能映射到Mysql的某个database,并自动在ClickHouse中建立对应的 ReplacingMergeTree
原理类似canal、Maxwell,ClickHouse 服务做为 MySQL 副本,
读取 Binlog 并执行 DDL 和 DML 请求,实现了基于 MySQL Binlog 机制的业务数据库实时同步功能。
读取 Binlog 并执行 DDL 和 DML 请求,实现了基于 MySQL Binlog 机制的业务数据库实时同步功能。
特点
MaterializeMySQL 同时支持全量和增量同步,
在 database 创建之初会全量同步MySQL 中的表和数据,之后则会通过 binlog 进行增量同步
在 database 创建之初会全量同步MySQL 中的表和数据,之后则会通过 binlog 进行增量同步
MaterializeMySQL database 为其所创建的每张 ReplacingMergeTree 自动增加了_sign 和 _version 字段。
此处可以类比我们处理数据一致性的时候加的deleted标记
_version 用作 ReplacingMergeTree 的 ver 版本参数,每当监听到 insert、update 和 delete 事件时,在 databse 内全局自增
类比create_time字段
类比create_time字段
_sign 则用于标记是否被删除,取值 1 或 者 -1。
类比deleted标签
类比deleted标签
目前支持四种binlog事件
MYSQL_WRITE_ROWS_EVENT
_sign = 1,_version ++
MYSQL_DELETE_ROWS_EVENT
_sign = -1,_version ++
MYSQL_UPDATE_ROWS_EVENT
新数据 _sign = 1
MYSQL_QUERY_EVENT
支持 CREATE TABLE 、DROP TABLE 、RENAME TABLE 等。
使用细则
DDL 查询
MySQL DDL 查询被转换成相应的 ClickHouse DDL 查询
如(ALTER, CREATE, DROP, RENAME),clickhouse不能解析的ddl查询直接忽略
数据复制
MaterializeMySQL 不支持直接插入、删除和更新查询,
而是将 DDL 语句进行相应转换
而是将 DDL 语句进行相应转换
MySQL INSERT 查询被转换为 INSERT with _sign=1
MySQL DELETE 查询被转换为 INSERT with _sign=-1
MySQL UPDATE 查询被转换成 INSERT with _sign=1 和 INSERT with _sign=-1。
SELECT 查询
如果在 SELECT 查询中没有指定_version,
则使用FINAL 修饰符,返回_version 的最大值对应的数据,即最新版本的数据。
则使用FINAL 修饰符,返回_version 的最大值对应的数据,即最新版本的数据。
如果在 SELECT 查询中没有指定_sign,
则默认使用 WHERE _sign=1,即返回未删除状态(_sign=1)的数据。
则默认使用 WHERE _sign=1,即返回未删除状态(_sign=1)的数据。
索引转换
ClickHouse 数据库表会自动将 MySQL 主键和索引子句转换为 ORDER BY 元组。
ClickHouse 只有一个物理顺序,由 ORDER BY 子句决定。如果需要创建新的物理顺序,请使用物化视图。
配置方法
mysql配置
mysql开启binlog,且为Row
(打开/etc/my.cnf,在[mysqld]下添加)
(打开/etc/my.cnf,在[mysqld]下添加)
server-id=1
log-bin=mysql-bin
binlog_format=ROW
log-bin=mysql-bin
binlog_format=ROW
开启GTID模式
gtid-mode=on
enforce-gtid-consistency=1 # 设置为主从强一致性
log-slave-updates=1 # 记录日志
enforce-gtid-consistency=1 # 设置为主从强一致性
log-slave-updates=1 # 记录日志
重启Mysql
sudo systemctl restart mysqld
clickhouse配置
开启 ClickHouse 物化引擎
set allow_experimental_database_materialize_mysql=1;
创建复制管道
ClickHouse 中创建 MaterializeMySQL 数据库
CREATE DATABASE test_binlog ENGINE =
MaterializeMySQL('hadoop1:3306','testck','root','000000');
MaterializeMySQL('hadoop1:3306','testck','root','000000');
4 个参数分别是 MySQL 地址、databse、username 和 password。
查看 ClickHouse 的数据
use test_binlog;
show tables;
show tables;
查看对应表
查询小技巧
查询过程中中增加 _sign 和 _version 虚拟字段,可以帮助我们理解底层数据插入原理
在查询时,对于已经被删除的数据,_sign=-1,ClickHouse 会自动重写 SQL,将 _sign = -1 的数据过滤掉;
对于修改的数据,则自动重写 SQL,为其增加 FINAL 修饰符。
删除表
Mysql删除表ClickHouse也会删除表
新建表
Mysql新建表,ClickHouse也会新建表
常见问题
分布式 DDL 某数据节点的副本不执行
问题
使用分布式 ddl 执行命令 create table on cluster xxxx 某个节点上没有创建
表,但是 client 返回正常,查看日志有报错,见注释
表,但是 client 返回正常,查看日志有报错,见注释
解决办法
解决办法:重启该不执行的节点。
数据副本表和数据不一致
问题
由于某个数据节点副本异常,导致两数据副本表不一致,
某个数据副本缺少表,需要将两个数据副本调整一致。
某个数据副本缺少表,需要将两个数据副本调整一致。
解决办法
在缺少表的数据副本节点上创建缺少的表,创建为本地表,表结构可以在其他数据副本通过 show crete table xxxx 获取。
表结构创建后,clickhouse 会自动从其他副本同步该表数据,验证数据量是否一致即可。
副本节点全量恢复
问题
某个数据副本异常无法启动,需要重新搭建副本
解决办法
清空异常副本节点的 metadata 和 data 目录。
从另一个正常副本将 metadata 目录拷贝过来(这一步之后可以启动数据库,但是只有表结构没有数据)。
执行 sudo -u clickhouse touch /data/clickhouse/flags/force_restore_data
启动数据库
数据副本启动缺少 zk 表
问题
某个数据副本表在 zk 上丢失数据,或者不存在,
但是 metadata 元数据里存在,导致启动异常
但是 metadata 元数据里存在,导致启动异常
解决办法
metadata 中移除该表的结构文件,如果多个表报错都移除
mv metadata/xxxxxx/xxxxxxxx.sql /tmp/
启动数据库
手工创建缺少的表,表结构从其他节点 show create table 获取。
创建后会自动同步数据,验证数据是否一致。
ZK table replicas 数据未删除,导致重建表报错
问题
重建表过程中,先使用 drop table xxx on cluster xxx ,各节点在 clickhouse 上table 已物理删除
但是 zk 里面针对某个 clickhouse 节点的 table meta 信息未被删除(低概率事件)
zk 里仍存在该表的 meta 信息,导致再次创建该表 create table xxx on cluster, 该节点无法创建表(其他节点创建表成功)
解决办法
从其他数据副本 cp 该 table 的 metadata sql 过来.
重启节点。
Clickhouse 节点意外关闭
问题
在大量 insert 数据的情况下,某个节点意外宕机
现象
数据写入不受影响、数据查询不受影响、建表 DDL 执行到异常节点会卡住
副本机制,读写不受影响
副本机制,读写不受影响
解决办法
启动异常节点,期间其他副本写入数据会自动同步过来,其他副本的建表 DDL 也会同步。
其他问题
监控及备份
监控
概述
Clickhouse会将运行式的状态记录到众多系统表中(system.*)中,但是直接从中监控数据有些许不足
不足之处
过于底层,不直观,理想是可视化
ClickHouse只记录了自己的指标,有时候我们还关注zk、服务器IO、cpu等
解决办法
目前流行Prometheus + Grafana的组合方式
集成多框架,包括各种服务器的负载
Prometheus收集各类系统运行指标
Grafana负责可视化
Clickhouse从20.1.2.4内置对接Prometheus功能,可视为Prometeus的Endpoint服务,自动将metrics、envents、asynchronous_metrics三张系统表发送Prometheus
安装
百度
监控模板
官网
备份/恢复
官网
备份/恢复方式
备份
手动
子主题
创建用于存放备份数据的目录 shadow
sudo mkdir -p /var/lib/clickhouse/shadow/
如果目录存在,先清空目录下的数据
执行备份命令
echo -n 'alter table t_order_mt freeze' | clickhouse-client
将备份数据保存到其他路径
#创建备份存储路径
sudo mkdir -p /var/lib/clickhouse/backup/
sudo mkdir -p /var/lib/clickhouse/backup/
#拷贝数据到备份路径
sudo cp -r /var/lib/clickhouse/shadow/ /var/lib/clickhouse/backup/my-backup-name
sudo cp -r /var/lib/clickhouse/shadow/ /var/lib/clickhouse/backup/my-backup-name
#为下次备份准备,删除 shadow 下的数据
sudo rm -rf /var/lib/clickhouse/shadow/*
sudo rm -rf /var/lib/clickhouse/shadow/*
clickhouse-backup
可以实现自动化备份
这玩意得找好对应版本,不然用不了
配置丰富
不过这玩意一半丢给运维,自己搞死脑细胞
恢复
手动
将备份复制到 detached 目录
sudo cp -rl
backup/my-backup-name/1/store/cb1/cb176503-cd88-4ea8-8b17-6503cd888ea8/* data/default/t_order_mt/detached/
backup/my-backup-name/1/store/cb1/cb176503-cd88-4ea8-8b17-6503cd888ea8/* data/default/t_order_mt/detached/
注意:仅拷贝分区目录,注意目录所属的用户要是 clickhouse
执行 attach
echo 'alter table t_order_mt attach partition 20200601' | clickhouse-client
查看数据
Elasticsearch
冷热主从架构设计
简介
基于Lucene的分布式, 实时文档存储, 实时分析搜索引擎
特点
分布式
全文检索
ES架构
Gateway
Distributed Lucene Directory
ES的模块
索引模块(Index Module)
搜索模块(Search Module)
映射解析模块(Mapping)
Discovery、Scripting和第三方插件
Discovery是ES的节点发现模块,不同机器上的ES节点要组成集群需要进行消息通信,集群内部需要选举master节点,这些工作都是由Discovery模块完成。支持多种发现机制,如 Zen 、EC2、gce、Azure。
Scripting用来支持在查询语句中插入javascript、python等脚本语言,scripting模块负责解析这些脚本,使用脚本语句性能稍低。ES也支持多种第三方插件
ES的传输模块和JMX
Restful API
基础
可视化界面工具
elasticsearch-head
kibana
基本概念
集群
节点
索引
类型
文档
分片
副本
索引定义
索引(名词) 一个索引(index)就像是传统关系数据库中的数据库,它是相关文档存储的地
索引(动词) 「索引一个文档」表示把一个文档存储到索引(名词)里,以便它可以被检索或者查询。
倒排索引 传统数据库为特定列增加一个索引,例如B-Tree索引来加速检索。Elasticsearch和Lucene使用一种叫做倒排索引(inverted index)的数据结构来达到相同目的。
映射Mapping
核心数据类型
字符串类型
text 用于索引全文值的字段
keyword 用于索引结构化内容的字段, 只能按其确切值进行搜索
数字类型
long, integer, short, byte, double, float, half_float, scaled_float
布尔型: boolean + 日期: date
动态映射
当索引一个包含新域的文档—之前未曾出现-- Elasticsearch 会使用 动态映射 ,通过JSON中基本数据类型,尝试猜测域类型,使用如下规则
别名的使用
由于es的数据不可变性, es允许在索引中新增映射类型, 但是不允许修改已有的映射类型
无缝的从一个索引切换到另一个索引
API使用
分布式工作原理
集群
一个Elasticsearch服务实例就是一个节点(Node)
主节点
1. 不需要涉及到文档级别的变更和搜索等操作
2. 维护并更新Cluster 状态
所有节点信息
所有索引和其相关的Mapping和setting信息
分片路由信息
3. 节点的加入集群及移除
4. 分片的平衡
协调节点
1. 所有节点默认都是 协调节点
2. 请求可以发送到集群中的任一节点, 每个节点都有能力处理任意请求
3. 每个节点都知道集群中任一文档位置,所以可以直接将请求转发到需要的节点上
4. 负责数据的存储和读写的,写入索引,搜索数据,data node
默认一旦master宕机, 所有的写操作都会被拒绝,但是读操作是被允许的
发现机制+选主
Zen Discovery
发现机制
单播列表
discovery.zen.ping.unicast.hosts -- 单播列表
1. 节点启动后, 先ping单播列表中的host(建议配置3个Master-eligible-Node节点的host)
2. 假如单播列表的host参数不存在则ping localhost, 这样默认就是开发集群
3. 当节点联系到单播列表中的成员时,它就会得到整个集群所有节点的状态
4. 联系master节点, 请求加入集群
文件配置发现
通过外部文件提供主机列表
discovery.zen.hosts_provider:file
discovery.zen.hosts_provider:file
如果要让多个node组成一个es集群,首先第一个要设置的参数,就是cluster.name,多个node的cluster.name如果一样,才满足组成一个集群的基本条件。
选主
选主机制
activeMasters / masterCandidates 说明
activeMasters 启用中列表 -- 其他节点认为的master节点
masterCandidates 候选列表 -- 具有候选资格的节点, master.node配置为true旳节点
优先从activeMasters列表中选取
选主流程
1. ping所有节点, 获得并合并所有的 [activeMasters 启用中列表] 和 [masterCandidates 候选列表]
2. 从activeMasters列表选举Master节点
3. 如果activeMaster列表为空, 则从masterCandidates列表选举Master节点
4. 判断临时Master是否是本节点
是: 等待其他节点选我
否: 不接受其他节点的join请求,并向Master节点发送加入请求
选主算法
Bully算法
假定所有节点都有一个唯一的ID,使用该ID对节点进行排序,选择最小的节点作为Master
节点失效检测
主节点
定时ping所有节点
当有节点连不上时, 会执行removeNode.
当在线节点低于法定数量, 会放弃master身份执行rejoin以避免脑裂
当有节点连不上时, 会执行removeNode.
当在线节点低于法定数量, 会放弃master身份执行rejoin以避免脑裂
非主节点
定期pingMaster节点是否活跃,Master下线则触发rejoin重新选举
选主最少节点限制
discovery.zen.minimum_master_nodes
(master_eligible_nodes)/2+1
(master_eligible_nodes)/2+1
1. 参选主节点数达不到最小值的限制,则等待,直到节点数足够可以开始选举
2. 如果只有一个 local 节点那就选出的是自己
最新版本ES 7已经移除minimum_master_nodes配置,让Elasticsearch自己选择可以形成仲裁的节点。
脑裂
原因
网络
节点响应超时
节点负载过大
es实例无响应, 例如较大规模的内存回收操作(STW)
避免方法
1. discovery.zen.ping_timeout(默认值是3秒)
适当增大timeout时间
适当增大timeout时间
2. node.master: true node.data: false
master Node 和 Data Node 分离 -- 默认情况下这两个属性的值都是true
master Node 和 Data Node 分离 -- 默认情况下这两个属性的值都是true
处理方法
1. 重启集群
2. 注意节点的启动顺序, 因为2份数据不一致, 先启动的节点会成为主节点
集群故障检测
通过Ping的方式互检查
主节点负责Ping所有其他节点,判断是否有节点已经挂掉
其他节点也通过Ping的方式判断主节点是否处于可用状态
discovery.zen.fd.ping_interval 节点多久ping一次,默认1s
discovery.zen.fd.ping_timeout 等待响应时间,默认30s
discovery.zen.fd.ping_retries 失败或超时后重试的次数,默认3
处理并发冲突
乐观锁
版本号
Elasticsearch 中对文档的 索引 ,GET 和 delete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增
如果旧版本的文档在新版本之后到达,它可以被简单的忽略
检测变更的版本号是否等于ES中的当前文档版本号
外部系统
可以借助外部系统使用版本控制
PUT /website/blog/2?version=5&version_type=external
检测变更的版本号是否大于ES中的当前文档版本号
No master block 集群缺失主节点
discovery.zen.no_master_block
all 所有操作均不可做,读写、包括集群状态的读写api
write 默认为write,写操作被拒绝执行,基于最后一次已知的正常的集群状态可读
文档分片路由
shard = hash(routing) % number_of_primary_shards
分片数创建索引时便确定, 并且不能修改, 因为分片数涉及到分片路由
交互原理
文档 新建/索引/删除 交互
1. 客户端向 Node 1 发送新建、索引或者删除请求。
2. 节点使用文档的 _id 确定文档属于分片 0 。请求会被转发到 Node 3,因为分片 0 的主分片目前被分配在 Node 3 上
3. Node 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 Node 1 和 Node 2 的副本分片上。一旦所有的副本分片都报告成功, Node 3 将向协调节点报告成功,协调节点向客户端报告成功
在客户端收到成功响应时,文档变更已经在主分片和所有副本分片执行完成,变更是安全的
文档修改
1. 客户端向 Node 1 发送更新请求。
2. 它将请求转发到主分片所在的 Node 3 。
3. Node 3 从主分片检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片的文档。 如果文档已经被另一个进程修改,它会重试步骤 3 ,超过 retry_on_conflict 次后放弃。
4. 如果 Node 3 成功地更新文档,它将新版本的文档并行转发到 Node 1 和 Node 2 上的副本分片,重新建立索引。 一旦所有副本分片都返回成功, Node 3 向协调节点也返回成功,协调节点向客户端返回成功。
在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡
优化
ES内部压缩方案
倒排列表 - postings list 的POR压缩
Filter cache 的RBM压缩
分片策略
分片数设置
1. 每个分片占用的硬盘容量不超过ES的最大JVM的堆空间设置(一般设置不超过32G)
预估索引的容量设置分片数, 例如320G的索引容量则至少需要10个分片
预估索引的容量设置分片数, 例如320G的索引容量则至少需要10个分片
2. 分片数不超过节点数的3倍
推迟副本分片分配
默认情况,集群会等待一分钟来查看节点是否会重新加入
通过修改参数 delayed_timeout ,可以延长再均衡的时间
通过修改参数 delayed_timeout ,可以延长再均衡的时间
索引优化
1. 尽量避免使用nested或 parent/child Mapping类型
2. 如果一定要使用nested fields,保证nested fields字段不能过多,目前ES默认限制是50
3. 控制索引的字段数量、mapping深度、索引字段的类型
4. 不需要做模糊检索的字段使用 keyword类型代替 text 类型,这样可以避免在建立索引前对这些文本进行分词
5. 对于那些不需要聚合和排序的索引字段禁用Doc values
查询效率
1. 使用批量请求,批量索引的效率肯定比单条索引的效率要高
2. query_string 或 multi_match 的查询字段越多, 查询越慢。
可以在 mapping 阶段,利用 copy_to 属性将多字段的值索引到一个新字段,multi_match时,用新的字段查询。
可以在 mapping 阶段,利用 copy_to 属性将多字段的值索引到一个新字段,multi_match时,用新的字段查询。
3. 日期字段的查询, 尤其是用now 的查询实际上是不存在缓存的,因此, 可以从业务的角度来考虑是否一定要用now, 毕竟利用 query cache 是能够大大提高查询效率的。
4. 查询结果集的大小不能随意设置成大得离谱的值, 如query.setSize不能设置成 Integer.MAX_VALUE, 因为ES内部需要建立一个数据结构来放指定大小的结果集数据。
5. 避免层级过深的聚合查询, 层级过深的group by , 会导致内存、CPU消耗,建议在服务层通过程序来组装业务,也可以通过pipeline 的方式来优化。
6. cache的设置+使用
filter查询(QueryCache)
聚类或排序(FieldDataCache)
ShardRequestCache
倒排索引-数据结构
单词 - 文档矩阵
单词 - 文档矩阵是表达两者之间所具有的一种包含关系的概念模型
倒排索引基本概念
文档(Document)
文档集合(Document Collection)
文档编号(Document ID)
单词编号(Word ID)
倒排索引(Inverted Index)
单词词典
单词词典是由文档集合中出现过的所有单词构成的字符串集合,单词词典内每条索引项记载单词本身的一些信息以及指向“倒排列表”的指针
倒排列表
倒排列表记载了出现过某个单词的所有文档的文档列表及单词在该文档中出现的位置信息,每条记录称为一个倒排项(Posting)。根据倒排列表,即可获知哪些文档包含某个单词
文档频率信息(有多少个文档包含这个单词)
文档编号(哪个文档包含这个单词)
单词频率信息(这个单词在文档中的出现次数)
单词在文档出现的位置信息
倒排文件
倒排文件是存储倒排索引的物理文件
索引原理
倒排索引被写入磁盘后是不可改变 的:它永远不会修改
不需要锁
一旦索引被读入内核的文件系统缓存,便会留在哪里
其它缓存(像filter缓存),在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为数据不会变化。
写入单个大的倒排索引允许数据被压缩,减少磁盘 I/O 和 需要被缓存到内存的索引的使用量。
存储原理
概念
Lucene 中 按段搜索
段 segment
一个索引文件拆分为多个子文件,则每个子文件叫作段
提交点
当段被批量的写入磁盘, 则会生成一个提交点
事务日志 translog
新建索引 -(1)-> 内存缓存 -(2)-> 文件系统缓存 -(3)-> 磁盘
步骤
1. 用户创建了一个新文档,新文档被写入[内存缓存]中 --未可读
2. 每间隔1秒, [内存缓存]中数据会以段的形式写入[文件系统缓存] --可读
3. [文件系统缓存]触发flush刷新, 段被全量提交, 且一个提交点被写入[磁盘] --持久化
分片每30分钟自动刷新(flush)
translog 太大的时候触发刷新
持久化
由于创建新文档时, 数据并不直接落到磁盘, 因此在断电时会存在数据丢失的风险
1. 用户创建一个文档时, 新文档写入[内存缓存]的同时, 也会被追加到[tanslog]
2. 随着 [translog] 变得越来越大,达到一定程度后索引被刷新,在刷新flush之后,段被全量提交,一个提交点被写入硬盘,并且[translog]被清空
段合并
由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增
每一个段都会消耗文件句柄、内存和cpu运行周期。
每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢
1. 小的段被合并到大的段,然后这些大的段再被合并到更大的段
2. 段合并的时候会将那些旧的已删除文档从文件系统中清除
搜索API
查询/搜索
分页
from, size
分页原理
1. 假设查询请求from=10, size=5
2. 协调节点把查询请求分发到各分片中, 分片查询返回其排序前15的数据到协调节点
3. 假设索引分片数=4, 则协调节点最多会收到60条数据, 并重新进行排序返回5条给客户端
在分布式系统中,对结果排序的成本随分页的深度成指数上升
_all
当索引一个文档的时候,Elasticsearch 取出所有字段的值拼接成一个大的字符串,作为 _all 字段进行索引
除非设置特定字段,否则查询字符串就使用 _all 字段进行搜索
请求体查询
filltter与query的区别: fillter不影响评分
query
fillter
1. 通过倒排索引获取包含该 term 的所有文档
2. 创建 bitset, 一个段对应一个bitset
3. 迭代 bitset(s)
一旦为每个查询生成了 bitsets ,Elasticsearch 就会循环迭代 bitsets 从而找到满足所有过滤条件的匹配文档的集合
一旦为每个查询生成了 bitsets ,Elasticsearch 就会循环迭代 bitsets 从而找到满足所有过滤条件的匹配文档的集合
4. 增量使用计数
查询在最近的 256 次查询中会被用到,那么它就会被缓存到内存中
当 bitset 被缓存后,缓存会在那些低于 10,000 个文档(或少于 3% 的总索引数)的段(segment)中被忽略
query叶子体
match 标准查询
如果在一个全文字段上使用 match 查询,在执行查询前,它将用正确的分析器去分析查询字符串
如果在一个精确值的字段上使用它,那么它将会精确匹配给定的值
multi_match 多个字段上执行相同的 match 查询
range 查询找出那些落在指定区间内的数字或者时间
term 查询被用于精确值匹配
terms 查询和 term 查询一样,但它允许你指定多值进行匹配
exists 查询和 missing 查询被用于查找那些指定字段中有值 (exists) 或无值 (missing) 的文档
bool组合查询
must 文档 必须 匹配这些条件才能被包含进来
must_not 文档 必须不 匹配这些条件才能被包含进来
should 如果满足这些语句中的任意语句,将增加 _score ,否则,无任何影响。它们主要用于修正每个文档的相关性得分
filter 必须 匹配,但它以不评分、过滤模式来进行。这些语句对评分没有贡献,只是根据过滤标准来排除或包含文档。
constant_score查询
以非评分模式来执行 term 查询并以1作为统一评分
分布式检索
查询阶段
1. 客户端发送搜索请求到协调节点
2. 查询请求被转发到索引的每个主分片或副本分片中, 分片在本地执行查询并添加结果到大小为 from + size 的本地有序优先队列中
3. 分片返回各自优先队列中所有文档的 ID 和排序值给协调节点
取回阶段
1. 协调节点辨别出哪些文档需要被取回并向相关的分片提交多个 GET 请求
2. 每个分片加载并 丰富 文档(如果有需要的话),接着返回文档给协调节点
3. 一旦所有的文档都被取回了,协调节点返回结果给客户端。
scroll 游标查询
Hbase
Hbase架构模型&读写流程
读
1、Client访问zookeeper,获取hbase:meta所在RegionServer的节点信息
2、Client访问hbase:meta所在的RegionServer,获取hbase:meta记录的元数据后先加载到内存中,然后再从内存中根据需要查询的RowKey查询出RowKey所在的Region的相关信息(Region所在RegionServer)
3、Client访问RowKey所在Region对应的RegionServer,发起数据读取请求
4、RegionServer构建RegionScanner(需要查询的RowKey分布在多少个Region中就需要构建多少个RegionScanner),用于对该Region的数据检索
5、RegionScanner构建StoreScanner(Region中有多少个Store就需要构建多少个StoreScanner,Store的数量取决于Table的ColumnFamily的数量),用于对该列族的数据检索
6、多个StoreScanner合并构建最小堆(已排序的完全二叉树)StoreHeap:PriorityQueue<StoreScanner>
7、StoreScanner构建一个MemStoreScanner和一个或多个StoreFileScanner(数量取决于StoreFile数量)
8、过滤掉某些能够确定所要查询的RowKey一定不在StoreFile内的对应的StoreFileScanner或MemStoreScanner
9、经过筛选后留下的Scanner开始做读取数据的准备,将对应的StoreFile定位到满足的RowKey的起始位置
10、将所有的StoreFileScanner和MemStoreScanner合并构建最小堆KeyValueHeap:PriorityQueue<KeyValueScanner>,排序的规则按照KeyValue从小到大排序
11、从KeyValueHeap:PriorityQueue<KeyValueScanner>中经过一系列筛选后一行行的得到需要查询的KeyValue
写
1、StoreFile实际保存数据的物理文件,StoreFile以HFile的形式存储在HDFS上,每个Store会有一个或多个StoreFile,数据在每个StoreFile内都是有序的。在HDFS/hbase/data/default/user目录下
2、MemStore写缓存。由于HFile中的数据要求是有序的,所以数据是先存储在MemStore中,排好序后,等到达刷写时机才会写入到HFile。每次刷写都会形成一个新的HFile
3、WAL由于数据要经过MemStore排序后才能刷写到HFile,但是数据在内存中会有很高的概率丢失。为了解决这个问题,数据会先写在一个叫做Write-Ahead-logfile(Hlog)的文件中,然后在写入MemStore中。当系统出现故障时,就可以从这个日志文件进行重建
4、数据写入流程
(1)client向HreginoServer发送写请求
(2)HRegionServer将数据写到WAL
架构
zookeeper
和zk做高可用架构、HRegionServer和Hmaster做主从架构
保证任何时候集群只有一个活跃的master
存贮所有Rsgion的寻址入口
实时监控化Region server的上线和下线信息。并实时通知Master
存储Hbase的schema和table元数据
Client包含访问Hbase的接口并维护cache来加快对Hbase的访问
Master
为Region server分配region
负责Region server的负载均衡
发现失效的Region server并重新分配其上的region
管理用户对table的增删改查
regionserver
regionserver维护region,处理对这些region的IO请求
regionserver负责切分在运行中变得过大的region
数据采集探针
Flume
Filebeat
MapReduce
Hadoop
入门
一、概念
1、Haddoop是什么
2、Hadoop发展历史
3、Hadoop的三大发行版本
4、Hadoop的优势
5、Hadoop的组成
HDFS架构概述
YARN架构概述
MapReduce架构概述
HDFS、YARN、MapReduce三者关系
6、大数据技术生态体系
7、推荐系统案例
二、Hadoop运行环境搭建
模板虚拟机环境准备
克隆虚拟机
在hadoop102安装Hadoop
Hadoop目录结构
三、Hadoop运行模式
本地运行模式(官方WordCount)
完全分布式运行模式(开发重点)
虚拟机准备
编写集群分发脚本xsync
SSH无密登录配置
集群配置
群起集群
配置历史服务器
配置日志的聚集
集群启动/停止方式总结
编写Hadoop集群常用脚本
常用端口号说明
集群时间同步
四、常见错误的解决方案
HDFS
一、概述
HDFS产出背景及定义
HDFS优缺点
HDFS组成架构
HDFS文件块大小(面试重点)
二、HDFS的Shell相关操作
基本语法
命令大全
常用命令实操
准备工作
上传
下载
HDFS直接操作
三、HDFS的客户端API
客户端环境准备
HDFS的API案例实操
HDFS文件上传(测试参数优先级)
HDFS文件下载
HDFS文件更名和移动
HDFS删除文件和目录
HDFS文件详情查看
HDFS文件和文件夹判断
四、HDFS的读写流程
HDFS写数据流程
剖析文件写入
网络拓扑-节点距离计算
机架感知(副本存储节点选择)
HDFS读数据流程
五、NN和2NN
NN和2NN工作机制
Fsimage和Edits解析
CheckPoint时间设置
六、Datanode
DataNode工作机制
数据完整性
掉线时限参数设置
MapReduce
一、MapReduce概述
MapReduce定义
MapReduce优缺点
MapReduce核心思想
MapReduce进程
官方WordCount源码
常用数据序列化类型
MapReduce编程规范
WordCount案例实操
本地测试
提交到集群测试
二、序列化
序列化概述
自定义bean对象实现序列化接口
序列化案例实操
三、核心框架原理
InputFormat数据输入
切片与MapTask并行度决定机制
Job提交流程源码和切片源码详解
FileInputFormat切片机制
TextInputFormat
CombineTextInputFormat切片机制
CombineTextInputFormat案例实操
MapReduce工作流程
Shuffle机制
Partition分区
Partition分区案例实操
WritableComparable排序
WritableComparable排序案例实操(全排序)
WritableComparable排序案例实操(区内排序)
Combiner合并
Combiner合并案例实操
OutputFormat数据输出
OutputFormat接口实现类
自定义OutputFormat案例实操
MapTask工作机制
ReduceTask工作机制
ReduceTask并行度决定机制
MapTask & ReduceTask源码解析
MapReduce内核源码解析
Join应用
Reduce Join
Reduce Join案例实操
Map Join
Map Join案例实操
数据清洗(ETL)
MapReduce开发总结
四、压缩
概述
MR支持的压缩编码
压缩方式选择
Gzip压缩
Bzip2压缩
Lzo压缩
Snappy压缩
压缩位置选择
压缩参数配置
压缩实操案例
Map输出端采用压缩
Reduce输出端采用压缩
五、常见的问题及解决方案
Yarn
一、理论
1、Yarn基础架构
2、Yarn的工作机制
3、作业提交全过程
4、Yarn调度器和调度算法
FIFO
容量调度器
公平调度器
5、命令行操作Yarn
yarn application查看任务
yarn logs查看日志
yarn applicationattempt查看尝试运行的任务
yarn container查看容器
yarn node查看节点状态
yarn rmadmin更新配置
yarn queue查看队列
6、yarn生产环境核心参数
二、怎么玩
1、Yarn生产环境核心参数配置案例
2、容量调度器多队列提交案例
需求
配置多队列的容量调度器
向Hive队列提交任务
任务优先级
3、公平调度器案例
需求
配置多队列的公平调度器
测试提交任务
4、Yarn的Tool接口案例
生产调优手册
一、HDFS—核心参数
NameNode内存生产配置
NameNode心跳并发配置
开启回收站配置
二、HDFS—集群压测
测试HDFS写性能
测试HDFS读性能
三、HDFS—多目录
NameNode多目录配置
DataNode多目录配置
集群数据均衡之磁盘间数据均衡
四、HDFS—集群扩容及缩容
添加白名单
服役新服务器
服务器间数据均衡
黑名单退役服务器
五、HDFS—存储优化
纠删码
纠删码原理
纠删码案例实操
异构存储(冷热数据分离)
异构存储Shell操作
测试环境准备
HOT存储策略案例
WARM存储策略测试
COLD策略测试
ONE_SSD策略测试
ALL_SSD策略测试
LAZY_PERSIST策略测试
六、HDFS—故障排除
NameNode故障处理
集群安全模式&磁盘修复
慢磁盘监控
小文件归档
七、HDFS—集群迁移
Apache和Apache集群间数据拷贝
Apache和CDH集群间数据拷贝
八、MapReduce生产经验
MapReduce跑的慢的原因
MapReduce常用调优参数
MapReduce数据倾斜问题
九、Hadoop-Yarn生产经验
常用的调优参数
容量调度器使用
公平调度器使用
十、Hadoop综合调优
Hadoop小文件优化方法
Hadoop小文件弊端
Hadoop小文件解决方案
测试MapReduce计算性能
企业开发场景案例
需求
HDFS参数调优
MapReduce参数调优
Yarn参数调优
执行程序
Hadoop源码解析
一、RPC通信原理解析
二、NameNode启动源码解析
启动9870端口服务
加载镜像文件和编辑日志
初始化NN的RPC服务端
NN启动资源检查
NN对心跳超时判断
安全模式
三、DataNode启动源码解析
初始化DataXceiverServer
初始化HTTP服务
初始化DN的RPC服务端
DN向NN注册
向NN发送心跳
四、HDFS上传源码解析
create创建过程
DN向NN发起创建请求
NN处理DN的创建请求
DataStreamer启动流程
write上传过程
向DataStreamer的队列里面写数据
建立管道之机架感知(块存储位置)
建立管道之Socket发送
建立管道之Socket接收
客户端接收DN写数据应答Response
五、Yarn源码解析
Yarn客户端向RM提交作业
RM启动MRAppMaster
调度器任务执行(YarnChild)
六、MapReduce源码解析
Job提交流程源码和切片源码详解
MapTask & ReduceTask源码解析
七、Hadoop源码编译
前期准备工作
工具包安装
编译源码
Hive
hive的架构
表操作
create database[ if not exists ] db_hive;
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 分区
[CLUSTERED BY (col_name, col_name, ...) 分桶
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format] row format delimited fields terminated by “分隔符”
[STORED AS file_format]
[LOCATION hdfs_path]
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 分区
[CLUSTERED BY (col_name, col_name, ...) 分桶
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format] row format delimited fields terminated by “分隔符”
[STORED AS file_format]
[LOCATION hdfs_path]
alter table student set tblproperties('EXTERNAL'='TRUE');
create table student_partition2(
id int,
name string,
age int)
partitioned by (month string, day string)
row format delimited fields terminated by '\t';
id int,
name string,
age int)
partitioned by (month string, day string)
row format delimited fields terminated by '\t';
desc [ formatted ] student_partition3;
alter table student_partition3 add columns(address string);
alter table student_partition3 change column address address_id int;
alter table student_partition3 replace columns(deptno string, dname
string, loc string);
string, loc string);
alter table student_partition1 add partition(dt='20170602') partition(dt='20170603');
alter table student_partition1 drop partition (dt='20170602'),partition (dt='20170603');
show partitions student_partition1;
复合类型
array
例如: 存入 a:b:c
collection items terminated by ':'
array[0], array_contains('a'), size(colName)
map
例如: 存入 f:张#m:黄#s:李
collection items terminated by '#'
may keys terminated by ':'
map_keys(colName), map_values(colName), map['key'], size(colName)
struct
表数据导入方式
load data local inpath '/,,,' overwrite | into table tbName partition(dt="20190505")
insert into table student_partition1 partition(dt="2019-07-08") select * from student1;
create table if not exists tableName as select id, name from tableName;
create table if not exists student1(
id int,
name string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/student1';
id int,
name string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/student1';
create table student2 like student1;
export table student1 to '/export/student1';
import table student2 from '/export/student1';
export table student1 to '/export/student1';
import table student2 from '/export/student1';
表数据的导出方式
insert overwrite local directory '/opt/bigdata/export/student'
row format delimited fields terminated by ',' select * from student;
row format delimited fields terminated by ',' select * from student;
hdfs dfs -get /user/hive/warehouse/student/student.txt /opt/bigdata/data
bin/hive -e 'select * from default.student;' > /opt/bigdata/data/student1.txt
bin/hive -f sql文件 > /opt/bigdata/data/student1.txt
export table default.student to
表分区
静态分区
动态分区
创建普通表t_order
创建目标分区表 order_dynamic_partition
向普通表t_order加载数据
动态加载数据到分区表中
set hive.exec.dynamic.partition=true; //使用动态分区
set hive.exec.dynamic.partition.mode=nonstrict; //非严格模式
注意字段查询的顺序,分区字段放在最后面。否则数据会有问题。
insert into table order_dynamic_partition partition(order_time) select order_number,order_price,order_time from t_order;
将数据直接上传到分区目录(hdfs)上,让分区表和数据产生关联有哪些方式
上传数据后修复表
dfs -mkdir -p 分区目录
dfs -put 分区目录
msck repair table 表名 #刷新元数据
上传数据后添加分区
dfs -mkdir -p 分区目录
dfs -put 分区目录
alter table 表名 add partition()
分桶表
分桶是相对分区进行更细粒度的划分
set hive.enforce.bucketing=true;== 开启对分桶表的支持
set mapreduce.job.reduces=4;== 设置与桶相同的reduce个数(默认只有一个reduce)
--分桶表
create table user_buckets_demo(id int, name string)
clustered by(id)
into 4 buckets
row format delimited fields terminated by '\t';
create table user_buckets_demo(id int, name string)
clustered by(id)
into 4 buckets
row format delimited fields terminated by '\t';
--普通表
create table user_demo(id int, name string)
row format delimited fields terminated by '\t';
create table user_demo(id int, name string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/bigdata/data/buckets.txt' into table user_demo;
insert into table user_buckets_demo select * from user_demo;
抽样查询桶表的数据:tablesample抽样语句,语法:tablesample(bucket x out of y)
- x表示从第几个桶开始取数据
- y表示桶数的倍数,一共需要从 ==桶数/y== 个桶中取数据
- x表示从第几个桶开始取数据
- y表示桶数的倍数,一共需要从 ==桶数/y== 个桶中取数据
select * from user_buckets_demo tablesample(bucket 1 out of 2)
-- 需要的总桶数=4/2=2个
-- 先从第1个桶中取出数据
-- 再从第1+2=3个桶中取出数据
-- 需要的总桶数=4/2=2个
-- 先从第1个桶中取出数据
-- 再从第1+2=3个桶中取出数据
表数据压缩
Hive表中间数据压缩:shuffle阶段 落地到磁盘的数据
#设置为true为激活中间数据压缩功能,默认是false,没有开启
set hive.exec.compress.intermediate=true;
set hive.exec.compress.intermediate=true;
#设置中间数据的压缩算法
set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
Hive表最终输出结果压缩:输出到HDFS的数据
set hive.exec.compress.output=true;
set mapred.output.compression.codec=
org.apache.hadoop.io.compress.SnappyCodec;
org.apache.hadoop.io.compress.SnappyCodec;
文件存储格式
textfile
sequencefile
orc
parquet
自定义函数
UDF 一进一出
定义一个类继承==org.apache.hadoop.hive.ql.UDF
重载evaluate方法
将程序打成jar包上传到linux服务器
在hive的命令行窗口创建函数
add jar xxxxx.jar== (linux上jar包的路径)
create [temporary] function [dbname.]function_name AS class_name;
- hive命令行中删除函数
Drop [temporary] function [if exists] [dbname.]function_name;
创建永久函数
create function toUpper as 'com.kaikeba.udf.MyUDF' using jar 'hdfs://node1:9000/jars/hive_udf.jar';
UDAF 多进一出
UDTF一进多出
序列化/反序列化:SerDe
通过MultiDelimitSerDe 解决多字符分割场景
create table t1 (id String, name string)
row format serde 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
WITH SERDEPROPERTIES ("field.delim"="##");
row format serde 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
WITH SERDEPROPERTIES ("field.delim"="##");
通过RegexSerDe 解决多字符分割场景
create table t2(id int, name string)
row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "^(.*)\\#\\#(.*)$");
row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "^(.*)\\#\\#(.*)$");
通过JsonSerDe格式存储text文件
CREATE TABLE t3(id int, name string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE;
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE;
使用 json函数 操作json格式数据
get_json_object
select get_json_object(jsoncontext,"$.name") as name from t4;
json_tuple
select json_tuple(jsoncontext,"id","name") as (id,name) from t4;
企业级调优
Fetch抓取:在全局查找、字段查找、limit查找等都不走mapreduce
hive.fetch.task.conversion设置成 =》 more
本地模式
set hive.exec.mode.local.auto=true;
--设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,
--默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
--默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
--设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,
--默认为4
set hive.exec.mode.local.auto.input.files.max=5;
--默认为4
set hive.exec.mode.local.auto.input.files.max=5;
表的优化
小表、大表 join
将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率
使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce
大表 join 大表
空 key 过滤
空 key 转换
可以将表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上
map join
把小表全部加载到内存在map端进行join,避免reducer处理
set hive.auto.convert.join = true;
大表小表的阈值设置(默认25M一下认为是小表)
set hive.mapjoin.smalltable.filesize=25000000;
group By
开启Map端聚合参数设置
--是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
--在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
--有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
set hive.map.aggr = true;
--在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
--有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
count(distinct)
ount distinct使用先group by 再count的方式替换
--每个reduce任务处理的数据量 默认256000000(256M)
set hive.exec.reducers.bytes.per.reducer=32123456;
set hive.exec.reducers.bytes.per.reducer=32123456;
select count(distinct ip ) from log_text;
转换成
select count(ip) from (select ip from log_text group by ip) t;
转换成
select count(ip) from (select ip from log_text group by ip) t;
尽量避免笛卡尔积,即避免join的时候不加on条件,或者无效的on条件
使用分区剪裁、列剪裁
只获取需要的列的数据,减少数据输入
行过滤
并行执行
set hive.exec.parallel=true;
--同一个sql允许最大并行度,默认为8。
set hive.exec.parallel.thread.number=16;
set hive.exec.parallel.thread.number=16;
严格模式
--设置非严格模式(默认)
set hive.mapred.mode=nonstrict;
--设置严格模式
set hive.mapred.mode=strict;
set hive.mapred.mode=nonstrict;
--设置严格模式
set hive.mapred.mode=strict;
select * 不允许
对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行
对于使用了order by语句的查询,要求必须使用limit语句
限制笛卡尔积的查询
JVM重用
JVM实例在同一个job中重新使用N次。减少进程的启动和销毁时间
-- 设置jvm重用个数
set mapred.job.reuse.jvm.num.tasks=5;
set mapred.job.reuse.jvm.num.tasks=5;
推测执行
Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
--开启推测执行机制
set hive.mapred.reduce.tasks.speculative.execution=true;
set hive.mapred.reduce.tasks.speculative.execution=true;
压缩(参考上面的表数据压缩配置)
数据倾斜
合理设置Map数
如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
小文件合并
在map执行前合并小文件,减少map数
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
复杂文件增加Map数
--设置maxsize大小为10M,也就是说一个block的大小为10M
set mapreduce.input.fileinputformat.split.maxsize=10485760;
set mapreduce.input.fileinputformat.split.maxsize=10485760;
合理设置Reduce数
- 每个Reduce处理的数据量默认是256MB
set hive.exec.reducers.bytes.per.reducer=256000000;
set hive.exec.reducers.bytes.per.reducer=256000000;
- 每个任务最大的reduce数,默认为1009
set hive.exec.reducers.max=1009;
set hive.exec.reducers.max=1009;
- - 计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
N=min(参数2,总输入数据量/参数1)
--设置每一个job中reduce个数
set mapreduce.job.reduces=3;
set mapreduce.job.reduces=3;
级联求和 sql题目
zookeeper
入门简介
是什么
官方版
设计模式来理解
是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,
然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在
Zookeeper上注册的那些观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式
一句话
zookeeper=类似unix文件系统+通知机制+Znode节点
作用:服务注册+分布式系统的一致性通知协调
能干嘛
命名服务
配置维护
集群管理
分布式消息同步和协调机制
对Dubbo的支持
备注
Zoopkeeper提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,
并对树中的节点进行有效管理,从而可以设计出多种多样的分布式的数据管理模型,作为分布式系统
的沟通调度桥梁
去哪下
官网
首页
https://zookeeper.apache.org/
下载
怎么玩
统一命名服务(Name Service如Dubbo服务注册中心)
配置管理(Configuration Management如淘宝开源配置管理框架Diamond)
Java操作API
安装配置
Linux下安装
官网下载安装包,本次版本zookeeper-3.4.9.tar.gz
拷贝进入到/opt目录下并解压
新建专属zookeeper目录,mkdir /myzookeeper,
随后将上一步解压的zookeeper内容拷贝进/myzookeeper目录内
进入conf文件夹,拷贝zoo_sample.cfg改为zoo.cfg
zoo.cfg解读
tickTime
initLimit
syncLimit
dataDir
clientPort
启动Zookeeper服务之前需要先安装好Java环境
开启服务+客户端连接
启动+关闭服务
CentOS6.8下面nc命令的安装
echo ruok | nc 127.0.0.1 2181
客户端连接
退出quit
永远的HelloWorld
查看+获得zookeeper服务器上的数据存储信息
文件系统
Zookeeper维护一个类似文件系统的数据结构
初识znode节点
数据模型/znode节点深入
Znode的数据模型
是什么
ZooKeeper的Stat结构体
czxid- 引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id)
ctime - znode被创建的毫秒数(从1970年开始)
mzxid - znode最后更新的zxid
mtime - znode最后修改的毫秒数(从1970年开始)
pZxid-znode最后更新的子节点zxid
cversion - znode子节点变化号,znode子节点修改次数
dataversion - znode数据变化号
aclVersion - znode访问控制列表的变化号
ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
dataLength- znode的数据长度
numChildren - znode子节点数量
小总结
znode中的存在类型
PERSISTENT-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
基础命令和Java客户端操作
zkCli的常用命令操作
一句话:和redis的KV键值对类似,只不过key变成了一个路径节点值,v就是data
常用命令
help
ls
使用 ls 命令来查看当前znode中所包含的内容
ls2
查看当前节点数据并能看到更新次数等数据
stat
查看节点状态
set
设置节点的具体值
set 节点 value值
get
获得节点的值
get 节点
create
普通创建
-s
含有序列
-e
临时(重启或者超时消失)
delete
rmr
四字命令
是什么
常用
ruok:测试服务是否处于正确状态。如果确实如此,那么服务返回“imok ”,否则不做任何相应
stat:输出关于性能和连接的客户端的列表
conf:输出相关服务配置的详细信息
cons:列出所有连接到服务器的客户端的完全的连接 /会话的详细信息。包括“接受 / 发送”的包数量、会话id 、操作延迟、最后的操作执行等等信息
dump:列出未经处理的会话和临时节点
envi:输出关于服务环境的详细信息(区别于conf命令)
reqs:列出未经处理的请求
wchs:列出服务器watch的详细信息
wchc:通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表
wchp:通过路径列出服务器 watch的详细信息。它输出一个与 session相关的路径
小总结
Java客户端操作
Maven工程和配置POM
log4j.xml
Code
通知机制
watch
通知机制
客户端注册监听它关心的目录节点,
当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,
zookeeper会通知客户端。
是什么
观察者的功能
一句话:异步回调的触发机制
watch事件理解
一次触发
发往客户端
为数据设置watch
时序性和一致性
code
一次性
多次(命名服务)
Zookeeper集群
伪分布式单机配置
说明
initLimit 是Zookeeper用它来限定集群中的Zookeeper服务器连接到Leader的时限
syncLimit 限制了follower服务器与leader服务器之间请求和应答之间的时限
配置步骤
zookeeper-3.4.9.tar.gz解压后拷贝到/myzookeeper目录下并重新名为zk01,再复制zk01形成zk02、zk03,共计3份
进入zk01/02/03分别新建文件夹
mydata
mylog
分别进入zk01-zk03各自的conf文件夹
新建zoo.cfg
编辑zoo.cfg
设置自己的数据和log路径
dataDir=/myzookeeper/zk01/mydata
dataLogDir=/myzookeeper/zk01/mylog
修改各自的clientPort
在最后面添加server的列表
在各自mydata下面创建myid的文件,在里面写入server的数字
分别启动三个服务器
zkCli连接server,带参数指定-server
常见应用案例
命名服务(NameService)
前面所演示的每个nodepath就是一个命名服务
一句话path就是命名服务,客户端能够根据指定节点的名字来获取资源或者服务地址,然后进行下一步操作
软负载均衡
缓存中间件
MQ
kafka
流批一体实时技术
Flink
Flink 介绍
数据架构的衍变
传统数据架构
传统单体数据架构
微服务架构
大数据数据架构
批计算(离线)
实时计算
有状态流计算架构
状态:计算过程中产生的中间结构
每次计算新的数据进入到流系统中都是给予中间状态结果的基础上计算。
Flink的优势
支持高吞吐、低延迟、高性能
支持事件时间概念
支持有状态计算
支持高度灵活的窗口操作
基于轻量级分布式快照实现容错
给予JVM实现独立的内存管理
Save Point保存点
Flink应用场景
实时智能推荐
复杂事件处理
实时欺诈检测
实时数据仓库与ETL
流数据分析
实时报表分析
Flink 基本架构
基本组件栈
API&Libraries层
DataStream API
GEP(复杂事件处理库)
SQL&Table库
DataSetAPI
FlinkML
Gelly
SQL&Table库
Runtime核心层
负责对上层不同接口提供基础服务,是Flink分布式计算框架的核心实现层
物理部署层
本地
集群
Standalone
Yarn
云
GEC
EC2
Kubenetes
基本架构
遵循Master-Slave架构设计原则,主要包含JobManager、TaskManager两个组件
组件间的通信协议:Akka框架
组成
组成
Flink 客户端
负责提交任务到集群
与JobMananager建立Akka连接
JobMananager
集群任务的调度以及资源的管理
与Flink 客户端 、TaskManager通信,保证任务的执行
任务完成后,反馈结果给客户端并释放资源
TaskManager
负责具体执行任务和对应任务节点上的资源深情与管理
数据类型和序列化
Flink中的类型处理
通过引用字段名(如dataSet.keyBy("username")来使用pojo类型和分组/连接/聚合它们
类型信息允许Flink尽早检查(打印错误和类型兼容性),而不是在运行时失败。
Flink对数据类型的了解越多,序列化和数据布局方案就越好。
最后,它还使用户在大多数情况下不必担心序列化框架和必须注册类型。
最常见的问题
注册子类型(Registering subtypes)
如果函数签名只描述超类型,但是它们实际上在执行期间使用了这些超类型的子类型,那么让Flink意识到这些子类型可能会大大提高性能。
为此,在StreamExecutionEnvironment或ExecutionEnvironment上为每个子类型调用 .registerType(clazz)。
注册自定义序列化器(Registering custom serializers)
对于自己不能透明地处理的类型,Flink会返回到Kryo。
但是并不是所有类型都可以由Kryo(也就是由Flink)无缝地处理。
解决方案是为引起问题的类型注册额外的序列化器。
调用StreamExecutionEnvironment或ExecutionEnvironment上的.getConfigaddDefaultKryoSerializer(Class type, T serializer)
许多库中都提供了额外的Kryo序列化器。
添加类型提示(Adding Type Hints)
当Flink无法推断出泛型类型时,有时用户必须传递类型提示。
这通常只在Java API中需要。
手动创建类型信息(Manually creating a TypeInformation)
这对于某些API调用可能是必要的,因为Java的泛型类型擦除导致Flink无法推断数据类型。
TypeInformation
TypeInformation 是所有类型描述符的基类。
它揭示了类型的一些基本属性,可以生成序列化器,在专门化中,还可以生成类型的比较器。
Flink 数据类型
基本类型
所有Java原语及其装箱形式
void、String、Date、BigDecimal和BigInteger。
基本数组和对象数组
复合类型
Flink Java Tuples (Flink Java API的一部分)
最多25个字段,不支持空字段
Scala case类(包括Scala元组)
不支持空字段
Row
具有任意数量的字段和支持空字段的元组
POJOs
遵循某种类似于bean的模式的类
辅助类型
Option, Either, Lists, Maps, …
泛型
这些不会被Flink本身序列化,而是被Kryo序列化。
POJO类型的规则
类是公共的并且是独立的(没有非静态的内部类)
该类有一个公共的无参数构造函数
类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(和非final的),要么有一个公共的getter和setter方法,该方法遵循getter和setter的Java bean命名约定。
注意,当用户定义的数据类型不能被识别为POJO类型时,必须将其处理为GenericType并使用Kryo进行序列化。
DataSet API 介绍与使用
DataSet 概述
Flink中的DataSet 程序是实现DataSet 转换(如 filtering, mapping, joining, grouping)的常规程序。
DataSet 最初是从某些来源(例如,通过读取文件,或从本地集合)创建的。
结果通过sink返回,例如将数据写入(分布式)文件、写入标准输出(例如命令行终端)。
DataSet API
说明
DataSet API 用于处理批量数据,Flink将接入的数据转换成DataSet数据集
开发相关依赖包,见注释
一般开发流程
创建ExecutionEnvironment环境
读取数据
利用DataSet API 提供的Transformation进行数据转换
输出结果
Data Sources 数据接入
说明
数据源创建初始的DataSet,数据可能来自不同的数据源
创建DataSet的机制一般抽象于InputFormat
Flink提供了几种内置格式,用于从常见的文件格式创建数据集,其中许多可以在ExecutionEnvironment找到快捷方法
数据接入的种类
File-based 文件系统
readTextFile(path) / TextInputFormat
按行读取文件并将其作为DataSet[String]返回。
readTextFileWithValue(path) / TextValueInputFormat
按行读取文件并将其作为StringValues返回。StringValues是可变的字符串。
通过StringValue存储文本数据可以有效降低Strin对象创建数量,减少性能开销
readCsvFile(path) / CsvInputFormat
解析逗号(或其他字符)分隔字段的文件。
返回一个tuples或POJOs数据集。支持基本的java类型及其对应的值作为字段类型。
readFileOfPrimitives(path, Class) / PrimitiveInputFormat
使用给定的分隔符解析新行(或另一个字符序列)分隔的基本数据类型(如字符串或整数)。
readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat
使用给定的分隔符解析新行(或另一个字符序列)分隔的基本数据类型(如字符串或整数)。
readSequenceFile(Key, Value, path) / SequenceFileInputFormat
创建JobConf并使用类型SequenceFileInputFormat、Key类和Value类从指定路径读取文件,返回Tuple2<Key, Value>。
Collection-based
fromCollection(Seq)
从一个Seq创建一个数据集
fromCollection(Iterable)
从一个Iterable创建一个数据集。由Iterable返回的所有元素必须是相同类型的。
fromCollection(Iterator)
从一个Iterable创建一个数据集。由Iterator返回的所有元素必须是相同类型的。
fromElements(elements: _*)
根据给定的对象序列创建数据集。所有对象必须具有相同的类型。
fromParallelCollection(SplittableIterator)
从iterator并行地创建数据集。该类指定迭代器返回的元素的数据类型。
generateSequence(from, to)
并行地生成给定区间内的数字序列。
Generic 通用类型
说明
DataSet API中提供了InputFormat 通用的数据接口,以接入不同的数据源和格式类型的数据
接口类型
基于文件类型
readFile(inputFormat, path) / FileInputFormat
基于通用数据类型接口如:RDBMS、NoSQL
createInput(inputFormat) / InputFormat
接口
readFile(inputFormat, path) / FileInputFormat
自定义文件类型输入源,将指定格式文件读取并转换成DataSet
createInput(inputFormat) / InputFormat
自定义通用型数据源,将读取的数据转换成DataSet数据集
案例:读取mysql数据做为数据源
一些特殊输入案例
递归遍历输入路径目录
对于基于文件的输入,当输入路径是目录时,默认情况下不会遍历目录文件。
相反,只读取基本目录中的文件,而忽略嵌套文件。方法启用嵌套文件的递归枚举
读取压缩文件
Flink目前支持输入文件的透明解压,如果这些文件用适当的文件扩展名进行了标记。
特别是,这意味着不需要进一步配置输入格式,任何FileInputFormat都支持压缩,包括自定义输入格式。
压缩文件可能无法并行读取,从而影响作业的可伸缩性。
DataSet Transformations 数据集转换
说明
针对数据集的转换操作
转换的实质是将DataSet转换成另一个新的DataSet,然后将各个DataSet的转换连接成有向无环图,并基于DAG完成对批量数据的处理
数据处理
Map
获取一个元素并生成一个元素,数据分区不发生变化
FlatMap
获取一个元素并生成零个、一个或多个元素。包括空值
FlatMap
获取一个元素并生成零个、一个或多个元素。包括空值
获取一个元素并生成零个、一个或多个元素。包括空值
MapPartition
类似Map,MapPartition是基于DataSet分区对数据进行处理,以“iterator”的形式获取分区数据,并可以生成任意数量的结果值
每个分区中的元素数量取决于并行度和以前的操作
Filter
根据传入的条件对每条数据进行过滤,只有判定为Truede 的数据元素才会传输到下游的DataSet中
聚合操作
Reduce
通过将两个元素重复组合为一个元素,将一组元素组合为单个元素。
Reduce可以应用于完整的数据集,也可以应用于分组的数据集。
ReduceGroup
将一组元素组合成一个或多个元素。ReduceGroup可以应用于完整的数据集,也可以应用于分组的数据集。
Aggregate
将一组值聚合为单个值,可以看作是内置的reduce函数。
聚合可以应用于完整的数据集,也可以应用于分组的数据集。
可以对最小、最大和聚合使用简写语法。
Distinct
返回数据集中不同的元素。它从输入数据集中删除与元素的所有字段或字段子集相关的重复项。
多表关联
Join
根据指定条件关联两个数据集,然后根据所选字段形成一个新数据集
关联的key可以是key的表达式、Key-selector函数、字段位置以及Case Class字段指定
注意,连接转换只对等连接起作用。其他连接类型需要使用OuterJoin或CoGroup来表示。
可以根据Size Hint 标记数据及大小,Flink根据用户给定的线索调整计算策略
joinWithTiny
第二个数据集是小数据集
joinWithHuge
第二个数据集是大数据集
可以通过连接提示指定运行时执行连接的方式。描述了连接是通过分区还是广播进行的,以及它是使用基于排序的算法还是基于散列的算法。
JoinHint
BROADCAST_HASH_FIRST
将第一个数据集广播出去,并转换成HashTable存储,适合第一个数据集较小的情况
BROADCAST_HASH_SECOND
将第二个数据集广播出去,并转换成HashTable存储,适合第二个数据集较小的情况
OPTIMIZER_CHOOSES
与不设定JoinHint相同,优化的工作交给系统
REPARTITION_HASH_FIRST
将两个数据集重分区,将第一个数据集转换成HashTable存储,适用于第一个数据集比第二个数据集小,但是两个数据集都相对较大的情况
REPARTITION_HASH_SECOND
将两个数据集重分区,将第二个数据集转换成HashTable存储,适用于第二个数据集比第一个数据集小,但是两个数据集都相对较大的情况
REPARTITION_SORT_MERGE
将两个数据集重分区,并将每个分区排序,适用于两个数据集已经排好序的情况
OuterJoin
OuterJoin对两个数据集进行外关联
包含的关联方式
left
leftOuterJoin
right
rightOuterJoin
full outer join
fullOuterJoin
JoinHint
类似Join,但是并不是所有都支持,简单的总结就是左右关联外表支持广播、重分区哈希,全关联是不支持广播、重分区哈希
CoGroup
将两个数据集根据key组合在一起,相同的key会放在同一个Group中,如果制定的key尽在一个数据集中有记录,则会将Group 与空的Group 关联
Cross
将两个数据集合并成一个数据集,合并的的及结果是两个数据集的笛卡尔积
集合操作
Union
生成两个数据集的并集,两个数据集的格式必须相同
Rebalance
对数据集中的数据进行平均分布,使每个分区上的数据量相同,以消除数据倾斜
只有Map-like 的转换才可能遵循再平衡转换。
Hash-Partition
按给定键对数据集进行哈希分区。键可以指定为位置键、表达式键和键选择器函数。
Range-Partition
给定键上对数据集进行范围分区
Sort Partition
按指定顺序对指定字段上的数据集的所有分区进行本地排序。
Custom Partitioning
使用自定义分区器函数基于特定分区的键分配记录。
排序操作
First-n
返回数据集的前n个(任意)元素。first -n可以应用于常规数据集、分组数据集或分组排序的数据集。
MinBy / MaxBy
从数据集中返回指定字段或组合对应的最小或最大的记录
Data Sinks
数据接收器使用数据集并用于存储或返回它们。数据接收操作使用OutputFormat进行描述
三中数据输出类型
基于文件输出接口
writeAsText() / TextOutputFormat
将元素按行写入为字符串。通过调用每个元素的toString()方法获得字符串。
writeAsCsv(...) / CsvOutputFormat
将元组写入逗号分隔的值文件。行和字段分隔符是可配置的
write() / FileOutputFormat
方法和自定义文件输出的基类。支持自定义对象到字节的转换。
通用输出接口
output()/ OutputFormat
使用自定义的OutputFormat来实现输出
HadoopOutputFormat
JDBCOutputFormat
. . .
客户端输出
print() / printToErr()
打印标准输出/标准错误流中每个元素的toString()值。
Iteration Operators 迭代计算
全量迭代
过程
Iteration Input
初始化数据,通过DataSource算子读取或从其他转换中接入
Step Function
Step Function将在每次迭代中执行,它是由map、reduce、join等操作符组成的任意数据流,取决于您手头的特定任务。
将结合数据集以及上一次迭代计算的Solution数据集进行本次迭代计算
Next Partial Solution
每次迭代计算的输出结果称为Next Partial Solution,该结果会被作为下一次迭代计算的输入数据
Iteration Result
最后一次迭代的输出被写入DataSink,或用作以下操作符的输入。
迭代终止的两种条件
达到最大迭代次数
指定迭代的最大次数,当计算次数超过该设定阀值,终止迭代
符合自定义聚合器收敛条件
用户自定义的聚合器和收敛条件
增量迭代
过程
Iteration Input
从数据源读取initial workset或从以前的操作符中读取solution set,作为第一次迭代的输入。
Step Function
Step Function将在每次迭代中执行
Next Workset/Update Solution Set
Next Workset驱动迭代计算,并将反馈到下一次迭代,Solution Set将被更新并隐式转发(不需要重新构建),两个数据集都可以由不同Step Function更新。
Iteration Result
DataStream API
DataStream 编程模型
DataSource数据接入
内置数据源
文件数据源
readTextFile
直接读取
readFile
指定文件的InputFormat读取
FileProcessingMode
Socket数据源
socketTextStream
集合数据源
将java、scala的集合类(Collection)转换成DataStream数据集
本质:将本地集合中的个数据分发到远端并行执行的节点中
fromElements
Java 数组
Java List
外部数据源
数据源连接器
KAFKA
netty
Twitter
hdfs
...
自定义数据源连接器
DataStream 转换操作
从一个或多个DataStream生成新的DataStream 的过程称为转换
转换过程中每种操作类型被称定义为不同的Operator,不同的转换可以组成一个DataFlow拓扑
转换类型
Simgle-DataStream
Map[DataStream->DataStream]
常用作对数据集内的数据进行清洗和转换
FlatMap[DataStream->DataStream]
用于一个输入产生一个或多个元素的场景。比如wordCount
Filter[DataStream->DataStream]
筛选符合条件的数据
KeyBy[DataStream->KeyedStream]
将数据根据指定的key从DataStream->KeyedStream
对数据集进行partition操作,将key相同的数据放置在相同的分区中
不能使用KeyBy对数据充分去的情况
使用POJOs类型数据,但是没有复写hashCode()
任何数据类型的数组结构
Aggregations[KeyedStream->DataStream]
根据指定的算子进行聚合操作,滚动的产生一些列数据聚合的结果
本质上是对Reduce中的函数进行了封装
封装的函数如下:sum,min,minBy,max,maxBy等,所有的都可以通过.reduce()实现
Multi-DataStream
Union[ DataStream-> DataStream]
将两个或多个输入的数据集合并成一个数据集
要保证两个数据集的格式一致
Connect、CoMap、CoFlatMap[DataStream->DataStream]
合并两种或多种不同数据类型的数据集
就是join的各种实现
keyBy()
将key相同的数据路由到同一个Operator中
broadcast()
在计算之前将该数据集广播到所有并行计算的Operator中
案例
Split[DataStream -> SplitStream], Select[SplitStream -> DataStream]
将数据集安条件进行拆分,形成一个SplitStream
仅仅是对数据进行了标记,给元素指定了一个标记,并没有真正的拆分
配合select达到Fliter的效果,某些时候也可以看作是Union的逆过程
案例
iterate
迭代相关操作(后续补充)
物理分区操作
根据指定的分区策略将数据重新分配到不同节点的Task实例上执行
常见分区策略
Random Partition-rebalance
将数据随机分配到下游的每个分区中。
分区相对均衡
容易失去原有数据结构
RoundRobin Partition-rebalance
通过循环的方式对数据进行重分区,尽可能保证每个分区数据平衡
此方法比较有效,但是也容易失去原有的数据结构
Rescale Partition-rescale
rescale 和 rebalance 类似,也可以将数据以 round-robin 的方式发送给下游任务
rebalance 会将数据发送发送给所有下游任务
rescale 仅会对上下游继承的算子数据进行重平衡
广播操作-broadcast
复制所有的输入数据,然后将所有数据都复制到下游并行的Task实例中。
下有算子中的Task可以直接从本地内存中获取广播数据集,不再需要依赖于网络传输
适合小数据集,比如大表关联小表
自定义分区- partitionCustom
当所有分区策略都不满意时,可以自定义分区策略。
partitionCustom() 方法接收一个 Partitioner 对象
我们只需要实现一个 Partitioner,定义我们自己的分区策略
DataSink 数据输出
将DataStream数据输出到外部系统的过程被定义为DataSink操作
输出类型
基本数据输出
实现过程中不需要依赖其他第三方库
writeAsCsv
writeAsText
writeToSocket
print
第三方数据输出
通过第三方连接器输出到外部系统中
时间概念与WaterMark
时间的三种概念
事件生成时间(Event Time)
事件时间是每个事件在其生产设备上发生的时间。这个时间通常是在记录进入Flink之前嵌入的,可以从每个记录中提取事件时间戳。
时间的进展取决于数据本身,而不是其他的时钟
事件时间程序必须指定如何生成事件时间水印( Event Time Watermarks),这是事件时间进展的信号机制。
除非知道事件是按顺序到达的(通过时间戳),否则在等待无序事件时,事件时间处理会导致一些延迟。由于只能等待有限的一段时间,这就限制了应用程序的确定性事件时间。
事件接入时间(Ingestion Time)
摄入时间是事件进入Flink的时间,在源操作符中,每个记录以时间戳的形式获取源的当前时间
摄入时间概念上介于事件时间和处理时间之间。
与处理时间相比,它稍微好资源一些,但是提供了更可预测的结果。摄取时间使用稳定的时间戳(在源处分配一次),对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个窗口操作可以将记录分配到不同的窗口。
与事件时间相比,摄取时间程序不能处理任何无序的事件或延迟的数据,但程序不必指定如何生成水印。
事件处理时间(Processing Time)
当流程序在 processing time上运行时,所有基于时间的操作(如windows)将使用运行各个操作符的机器的系统时钟。
处理时间是最简单的时间概念,不需要流和机器之间的协调。
它提供了最好的性能和最低的延迟。
在分布式和异步环境中,处理时间并不能提供确定性,它容易受到记录到达系统的速度(例如,从消息队列)、记录在系统内部操作符之间流动的速度以及停机(计划的或其他方式)的影响。
设定时间特性
Flink DataStream程序的第一部分通常设置基本时间特性。
Event Time and Watermarks
支持事件时间的流处理器需要一种方法来度量事件时间的进度。告诉窗口操作属于该窗口的数据已经全部接收,以便关闭正在运行的窗口。
事件时间可以独立于处理时间
分配时间戳
直接在数据流源中分配
要直接向源中的元素分配时间戳,源必须使用SourceContext上的collectWithTimestamp(…)方法。要生成水印,源程序必须调用emitWatermark(Watermark)函数。
通过 timestamp assigner / watermark generator
时间戳分配程序获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经具有时间戳和/或水印,则timestamp assigner将覆盖它们。
Timestamp assigners程序通常在数据源之后立即指定,但并不严格要求这样做。
watermarks的生成方式
在Flink中测量事件时间进展的机制是水印(watermarks)。
周期性的(With Periodic Watermarks)
周期性的触发watermark的生成和发送,默认是100ms
时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定
每隔N秒自动向流里注入一个WATERMARK
每次调用getCurrentWatermark 方法, 如果得到的WATERMARK,不为空并且比之前的大就注入流中
可以定义一个最大允许乱序的时间,这种比较常用
实现AssignerWithPeriodicWatermarks接口
根据某些特殊条件(With Punctuated Watermarks)
基于某些事件触发watermark的生成和发送
基于事件向流里注入一个WATERMARK,每一个元素都有机会判断是否生成一个WATERMARK.
如果得到的WATERMARK 不为空并且比之前的大就注入流中
实现AssignerWithPunctuatedWatermarks接口
Timestamps per Kafka Partition
使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式,当使用来自Kafka的流时,多个分区经常被并行地使用,交叉使用来自分区的事件并破坏每个分区的模式
这种情况下,可以使用Kafka-partition-aware watermark generation,该特性,可以在Kafka使用者内部生成每个Kafka分区的水印,每个分区的水印合并的方式与在流变换中合并水印的方式相同。
Windows计算
Windows Assigner
Keyed?
Keyed
根据key 分时统计,某一个段,某用户登陆的网站数量
No-Keyed
全量分时统计,如:某一时段网站所有请求数
窗口类型
基于时间
滚动窗口
Event Time
window
timeWindow
Process Time
window
timeWindow
滑动窗口
Event Time
window
timeWindow
Process Time
window
timeWindow
会话窗口
Event Time
withGap
withDynamicGap
Process Time
withGap
withDynamicGap
全局窗口
GlobalWindows
基于数量
自定义窗口分配器
根据自己的需要进行数据的的窗口划分
需要实现
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner#WindowAssigner
实现案例
Windows Function
增量聚合
ReduceFunction
AggregateFunction
FoldFunction
全量聚合
ProcessWindowFunction
pv、uv统计demo
Incremental Aggregation + ProcessWindowFunction
求窗口中指标的最大值、窗口终止时间
Trigger 窗口触发器
EventTimeTrigger
通过Watermark和EndTime确定是否触发窗口
ProcessingTimeTrigger
通过ProcessTime和窗口EndTime确定是否触发窗口
ContinuousEventTimeTrigger
基于给定时间间隔 或 EndTime小于Watermark 时触发窗口
ContinuousProcessingTimeTrigger
指定的时间间隔触发窗口
CountTrigger
根据接入的数据量是否超过设定的阀值确定是否触发窗口
DeltaTrigger
提供 delta 函数和历史 datapoint 存储,每个元素消费时触发 delta 函数计算
PurgingTrigger
NeverTrigger
StateCleaningCountTrigger
继承Trigger 自定义触发器
实现
org.apache.flink.streaming.api.windowing.triggers.Trigger#Trigger
案例
按照指定周期时间或数据量进行窗口的触发计算
子主题
数据剔除器 Evictor
CountEvictor
DeltaEvictor
TimeEvictor
实现Evictor自定义剔除器
延迟数据处理
flink 虽然提供了watermark机制,但是但是却只能在一定程度上解决数据乱序的问题。
某些情况下数据的延迟可能会非常严重,watermark 机制也无法保证所有数据都进入了窗口,而这些数据将会被丢弃
flink 使用allowedLateness来决定是否对延迟数据进行处理,传入的时间Time(T) 表示允许延迟的最大时延
allowedLateness与watermark
allowedLateness
flink窗口计算过程中会将窗口的结束时间加上T,座位窗口最后被释放的结束时间P
当接入事件的EventTime不超过P,但Watermark超过窗口的结束时间,直接触发窗口计算
如果接入事件的EventTime大于P,则对数据进行丢弃处理
watermark
当接入事件的EventTime不超过P,但Watermark超过窗口的结束时间,直接触发窗口计算
Table API & SQL
概述
简介
FLink提供两个关系型API用于统一流处理和批处理,即:Table API & SQL
Table API 是用于 scala 和 java 的语言继承API,允许以一种非常直观地方式组合来自关系的操作符(如选择、筛选、关联等)
SQL是基于Apache Calcite实现的SQL标准,让我们能够以SQL的形式简化开发的难度
无论是批处理输入(DataSet)还是流出里输入(DataStream),在这两个接口中具有相同语义的查询具有相同的结果
构建依赖
Planner
Planner:负责将关系操作符转换为可执行的、优化的Flink作业。
Planner分类
说明
从Flink 1.9开始,Flink提供了两种不同的planner实现,用于评估Table & SQL API程序(Blink planner、Old planner)
这两种计划器都提供了不同的优化规则和运行时类。它们还可能在支持的特性集上有所不同。
Old planner
Blink planner
两种planner的不同
Blink将批处理作业视为流的一种特殊情况。所以不支持 Table and DataSet的转换,
batch 作业将不会转换为DateSet程序,而是转换为DataStream程序,与流作业相同。
batch 作业将不会转换为DateSet程序,而是转换为DataStream程序,与流作业相同。
Blink planner不支持BatchTableSource,使用有界StreamTableSource 代替它。
Blink planner只支持全新的目录,不支持已被弃用的ExternalCatalog。
二者的FilterableTableSource实现是不兼容的:
Old planne 把PlannerExpressions下推到FilterableTableSource;
而Blink planner 将之下推到 Expressions
Old planne 把PlannerExpressions下推到FilterableTableSource;
而Blink planner 将之下推到 Expressions
仅Blink planner 支持基于key-value的配置操作
PlannerConfig 在两者中的实现是不同的
Blink planne将多个接收器优化为一个DAG(仅在TableEnvironment上支持,而在StreamTableEnvironment上不支持)。
Old planner总是优化每个下沉到一个新的DAG,所有DAG是相互独立的。
Old planner总是优化每个下沉到一个新的DAG,所有DAG是相互独立的。
使用流程
创建执行环境(TableEnvironment)
简介
TableEnvironment是Table API and SQL集成的核心概念,主要有一下功能
1. 在内部目录中注册一个表
2. 注册外部目录
3. 执行sql 查询
4. 注册用户自定义方法(scalar, table, or aggregation)
5. 将一个DataStream or DataSet转换成Table
6. 持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
1. 在内部目录中注册一个表
2. 注册外部目录
3. 执行sql 查询
4. 注册用户自定义方法(scalar, table, or aggregation)
5. 将一个DataStream or DataSet转换成Table
6. 持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
创建TableEnvironment
注册表
简介
TableEnvironment维护按名称注册的表的目录。
表的类型
输入表
输入表可以在Table API和SQL查询中引用,并提供输入数据
输入表可以来自多种数据源:
1. 现有的Table 对象,通常是Table API或SQL查询的结果。
2. 来自TableSource,它访问外部数据,如文件、数据库或消息传递系统
3. 来自流处理或批处理的 DataStream 或 DataSet 转换而来
1. 现有的Table 对象,通常是Table API或SQL查询的结果。
2. 来自TableSource,它访问外部数据,如文件、数据库或消息传递系统
3. 来自流处理或批处理的 DataStream 或 DataSet 转换而来
输出表
输出表可用于将Table API或SQL查询的结果发送到外部系统。
注册Table
实现
注册TableSource
TableSource 提供对存储在存储系统中的外部数据的访问:
比如数据库(MySQL, HBase,…);
具有特定编码的文件(CSV, Apache [Parquet, Avro, ORC],…);
或者消息传递系统(Apache Kafka, RabbitMQ,…)。
比如数据库(MySQL, HBase,…);
具有特定编码的文件(CSV, Apache [Parquet, Avro, ORC],…);
或者消息传递系统(Apache Kafka, RabbitMQ,…)。
实现
注册TableSink
TableSink 可以将 Table API or SQL 的查询结果提交到外部存储系统:
如:database、key-value 存储、消息队列、或文件系统(在不同的编码中,如CSV、Apache [Parquet, Avro, ORC],…)。
如:database、key-value 存储、消息队列、或文件系统(在不同的编码中,如CSV、Apache [Parquet, Avro, ORC],…)。
实现
注册一个外部目录(可选)
外部目录可以提供关于外部数据库和表的信息:
比如它们的名称、模式、统计信息以及如何访问存储在外部数据库、表或文件中的数据的信息。
比如它们的名称、模式、统计信息以及如何访问存储在外部数据库、表或文件中的数据的信息。
一旦在TableEnvironment中注册,在ExternalCatalog中定义的所有表都可以通过表API或SQL查询指定它们的完整路径来访问:
比如catalog.database.table。
比如catalog.database.table。
实现
注意: Blink planner 目前不支持外部目录
查询表
Table API
简介
Table API 是用于 scala和java语言的集成查询API,对比SQL,它采用的是一步一步组合的形式,而不是一句sql字符串语句
Table API 文档中描述的所有Table API operations 支持流处理和批处理
案例
注意:Scala Table API 使用Scala符号(')来引用标的属性(字段);
Table API 使用了Scala implicits,确保导入了一下依赖:
org.apache.flink.api.scala._
org.apache.flink.table.api.scala._
以使用scala的隐式转换
Table API 使用了Scala implicits,确保导入了一下依赖:
org.apache.flink.api.scala._
org.apache.flink.table.api.scala._
以使用scala的隐式转换
SQL
简介
Flink的SQL集成基于Apache Calcite,它实现了SQL标准。SQL查询被指定为常规字符串。
译文:SQL文档描述了Flink对流和批处理表的SQL支持。
案例
这两个API 可以混用,返回的本质都是Table objects
提交(发出)一个表
简介
Table API通过把数据写入TableSink将之提交出。
TableSink是一个通用接口:
支持各种文件格式(例如CSV、Apache Parquet、Apache Avro)、
存储系统(例如JDBC、Apache HBase、Apache Cassandra、Elasticsearch)
或消息传递系统(例如Apache Kafka、RabbitMQ)。
TableSink是一个通用接口:
支持各种文件格式(例如CSV、Apache Parquet、Apache Avro)、
存储系统(例如JDBC、Apache HBase、Apache Cassandra、Elasticsearch)
或消息传递系统(例如Apache Kafka、RabbitMQ)。
其中batch Table 仅仅可以写入BatchTableSink,
而streaming Table可从(AppendStreamTableSink, a RetractStreamTableSink, or an UpsertStreamTableSink.)中三选一
而streaming Table可从(AppendStreamTableSink, a RetractStreamTableSink, or an UpsertStreamTableSink.)中三选一
Table.insertInto(String tableName) 将Table 发送到注册的TableSink;
该方法会通过名称从目录中查询TableSink 并校验Table 的schema 是否于TableSink完全一致
该方法会通过名称从目录中查询TableSink 并校验Table 的schema 是否于TableSink完全一致
案例
转换或执行查询的流程
Old planner
说明
Table API and SQL 查询根据输入类型会被转换成 DataStream 或 DataSet,
在内部,查询表示为一个查询逻辑计划,并且通过两个阶段进行转换
在内部,查询表示为一个查询逻辑计划,并且通过两个阶段进行转换
转换过程
优化查询逻辑计划
将逻辑计划转换成DataStream or DataSet program
Table API or SQL 查询转换的时机
将Table提交到一个TableSink的时候,比如:调用Table.insertInto() 的时候
当制定一个SQL更新查询的时候,比如: 调用TableEnvironment.sqlUpdate() 的时候
将Table 转换成DataStream or DataSet的时候(后续介绍)
转换完成
一旦转换完成, 处理Table API or SQL 查询就跟处理常规DataStream or DataSet 程序一样,
当调用StreamExecutionEnvironment.execute() or ExecutionEnvironment.execute()的时候执行该查询
当调用StreamExecutionEnvironment.execute() or ExecutionEnvironment.execute()的时候执行该查询
Blink planner
说明
不管输入数据是什么类型,Table API and SQL都会被转换成 DataStream programs,
在内部,与Old planner类似,查询表示为一个查询逻辑计划,并且通过两个阶段进行转换
在内部,与Old planner类似,查询表示为一个查询逻辑计划,并且通过两个阶段进行转换
转换过程
优化查询逻辑计划
将逻辑计划转换成 DataStream (or DataSet) program
Table API or SQL 查询转换的时机
说明
TableEnvironment和StreamTableEnvironment,转换查询的行为是不同的。
StreamTableEnvironment
将Table提交到一个TableSink的时候,比如:调用Table.insertInto() 的时候
当制定一个SQL更新查询的时候,比如: 调用TableEnvironment.sqlUpdate() 的时候
将Table 转换成DataStream or DataSet的时候(后续介绍)
TableEnvironment
当执行TableEnvironment.execute()的时候会进行查询转换,
因为TableEnvironment会将多个sink优化为一个dag
因为TableEnvironment会将多个sink优化为一个dag
转换完成
一旦转换完成, 处理Table API or SQL 查询就跟处理常规DataStream程序一样,
当调用TableEnvironment.execute() or StreamExecutionEnvironment.execute()的时候执行该查询
当调用TableEnvironment.execute() or StreamExecutionEnvironment.execute()的时候执行该查询
Table API & SQL 与
DataStream & DataSet API 的集成
DataStream & DataSet API 的集成
说明
Table API and SQL的查询很容易与 DataStream and DataSet 程序集成
通过这种方式,我们可以对一些external table (for example from a RDBMS),做一些预处理,比如过滤、聚合、关联元数据等操作
然后使用构建在DataStream or DataSet API上的库 比如(CEP or Gelly)
然后使用构建在DataStream or DataSet API上的库 比如(CEP or Gelly)
DataStream or DataSet -> Table
直接将DataStream or DataSet 注册为Table
通过TableEnvironment可以将一个DataStream or DataSet注册为Table
Table 的 schema 依赖于DataStream or DataSet的数据类型
Table 的 schema 依赖于DataStream or DataSet的数据类型
案例
将DataStream or DataSet转换成Table
除了通过TableEnvironment进行转换,还可以直接将DataStream or DataSet转换成Table
案例
Table -> DataStream or DataSet
说明
Table 可以转换成DataStream or DataSet,由此,Table的查询结果可以使用DataStream or DataSetAPI
注意,在进行Table -> DataStream or DataSet的转换的时候,我们需要指定DataStream or DataSet的数据类型
即:表中行需要转换的数据类型,通常最方便的是类型设为Row
即:表中行需要转换的数据类型,通常最方便的是类型设为Row
常用可选类型的特性
Row
字段按位置映射,任意数量的字段,支持空值,没有类型安全的访问
POJO
字段按名称(POJO字段必须命名为表字段)、任意数量的字段、支持空值、类型安全的访问进行映射。
Case Class
字段按位置映射,不支持空值,类型安全访问。
Tuple
字段按位置映射,限制为22 (Scala)或25 (Java)字段,不支持空值,类型安全访问。
Atomic Type
表必须有单个字段,不支持空值,类型安全的访问。
Table -> DataStream
说明
将一个Table转成streaming query是一个动态更新的过程,当一个新的记录到达查询的输入流的时候,它会发生变化
所以,此类动态查询转而来的DataStream 需要对Table的更新进行编辑
所以,此类动态查询转而来的DataStream 需要对Table的更新进行编辑
Table -> DataStream的两种模式
Append Mode
只有当 dynamic Table仅通过插入更改进行修改时记录,才能使用此模式
也就是说,它只用于追加,以前发出的结果永远不会更新。
Retract Mode
这个模式可以一直使用。它使用布尔标志对插入和删除更改进行编码。详情后续介绍
案例
Table -> DataSet
案例
数据类型到Table Schema的映射
说明
Flink的DataStream和DataSet api支持非常多样化的类型。如:
Tuples (built-in Scala and Flink Java tuples),
POJOs,
Scala case classe,
Flink’s Row
允许使用包含多个字段的嵌套数据结构,这些字段可以在表表达式中访问。
其他的类型被视为原子类型(atomic)
Tuples (built-in Scala and Flink Java tuples),
POJOs,
Scala case classe,
Flink’s Row
允许使用包含多个字段的嵌套数据结构,这些字段可以在表表达式中访问。
其他的类型被视为原子类型(atomic)
在我们进行DataStream -> Table转换的时候,这些数据类型就会转转成内部行的形式表示
两种数据类型转换内部行的方式
Position-based
说明
基于位置的映射可以位子段提供更有意义的名称,同时保持字段的顺序。
可用于定义字段顺序的复合数据类型以及原子类型。
如:tuples, rows, and case classes之类具有顺序字段的类型
如:tuples, rows, and case classes之类具有顺序字段的类型
POJO类型必须使用field names进行映射
注意:当我们使用基于位置的映射关系的时候,指定的名称不能是原本数据类型中存在的名称
否则,API将会以基于名称的形式进行映射
如果没有指定名称,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用f0。
否则,API将会以基于名称的形式进行映射
如果没有指定名称,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用f0。
案例
Name-based
说明
基于名称的方式可以映射任意数据类型,包括POJOs。
是定义Table Schema 最灵活的方式,
映射的字段可以安名称引用、也可以通过 as 进行重命名,同时调整字段的顺序
是定义Table Schema 最灵活的方式,
映射的字段可以安名称引用、也可以通过 as 进行重命名,同时调整字段的顺序
注意:如果没有指定字段名,则使用复合类型的默认字段名和字段顺序,对于原子类型则使用f0。
案例
数据类型映射
Atomic Types
说明
Flink将原语(Integer, Double, String)或泛型类型(不能被分析和分解的类型)视为atomic types。
atomic type的DataStream or DataSet被转换成具有单个属性的Table
属性的类型是从原子类型推断出来的,也可以指定属性的名称
案例
Tuples (Scala and Java) 、
Case Classes (Scala only)
Case Classes (Scala only)
说明
Flink支持Scala的内置元组并未Java提供了自己的元组类,
这两种元组的 DataStreams and DataSets都可以转换成Table
这两种元组的 DataStreams and DataSets都可以转换成Table
默认采用基于位置的映射模式,当默认字段名被引用,API则会采用基于名称的映射方式,此时可以对字段通过as重命名和排列
案例
POJO (Java and Scala)
说明
Flink 以复合数据类型的方式支持POJOs
对POJO DataStream or DataSet转换成Table、并且没有指定字段名称时,
将会使用原始POJO字段名。
名称映射需要原始名称,不能用基于位置的映射。
将会使用原始POJO字段名。
名称映射需要原始名称,不能用基于位置的映射。
可以通过as重命名和重现排列字段顺序
案例
Row
说明
Row数据类型支持任意数量的字段和具有空值的字段
字段名可以通过RowTypeInfo指定
也可以在将 Row DataStream or DataSet转换成Table的时候指定
也可以在将 Row DataStream or DataSet转换成Table的时候指定
支持按位置和名称映射字段
字段可以通过为所有字段提供名称来重命名(基于位置的映射)
或者单独选择用于投影/排序/重命名的字段(基于名称的映射)
案例
总结
一般来说,只要你映射到Table的字段名不是原本DataStream or DataSet中的名称,都是基于位置映射的,此时需要注意顺序
当映射到Table 的字段名是原本DataStream or DataSet中的名称时候,是基于名称映射的,此时可以调整顺序,重命名等。
当映射到Table 的字段名是原本DataStream or DataSet中的名称时候,是基于名称映射的,此时可以调整顺序,重命名等。
查询优化
Old planner
说明
Flink利用Apache Calcite来优化和转换
目前Old planner支持的优化包括:投影(此处应该是投影消除)和过滤下推、子查询解关联以及其他类型的查询重写。
Old planner还没有优化joins的顺序(瞄了一下1.11好像也没有)
而是按照查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。
而是按照查询中定义的顺序执行它们(FROM子句中的表的顺序和/或WHERE子句中的连接谓词的顺序)。
通过提供CalciteConfig对象,可以调整在不同阶段应用的优化规则集。
Blink planner
说明
Apache Flink利用并扩展了Apache Calcite来执行复杂的查询优化。
这包括一系列规则和基于成本的优化
这包括一系列规则和基于成本的优化
基于 Apache Calcite的子查询解关联
投影裁减(消除)
分区裁减
过滤下推
子计划重复数据删除以避免重复计算
特殊子查询重写,包括两部分
将IN and EXISTS转换成left semi-joins
将 NOT IN and NOT EXISTS转换成left anti-join
可选join重排
通过启用table.optimizer.join-reorder-enabled
除了执行计划,
优化器还会根据数据源提供的丰富可用统计数据和
每个细粒度的操作比如(io, cpu, network, and memory.)做出智能决策
优化器还会根据数据源提供的丰富可用统计数据和
每个细粒度的操作比如(io, cpu, network, and memory.)做出智能决策
通过提供CalciteConfig对象,可以调整在不同阶段应用的优化规则集。
Explaining a Table
说明
flink 也提供了explain方法供我们查看表的逻辑计划和优化我们的查询
通过TableEnvironment.explain(table)获得对应表查询计划
注意在1.9中只有Blink planner才有explain
explain的内容
关系查询的抽象语法树,即未优化的逻辑查询计划,
优化的逻辑查询计划
实际执行计划
案例
Dynamic Tables
说明
Dynamic table是Flink’s Table API and SQL支持流数据的核心概念,
与表示批数据的静态表不同,动态表是随时间变化的。
查询动态表会产生连续的查询,并且产生一个动态表作为结果。
查询会不断地更新动态结果表一响应输入表的改变。
简单的可以理解为:动态表上的拉in徐查询类似定义一个物化视图的查询。
与表示批数据的静态表不同,动态表是随时间变化的。
查询动态表会产生连续的查询,并且产生一个动态表作为结果。
查询会不断地更新动态结果表一响应输入表的改变。
简单的可以理解为:动态表上的拉in徐查询类似定义一个物化视图的查询。
streams, dynamic tables, and continuous queries的关系:
stream -> dynamic table -> continuous query(state) -> dynamic table -> stream
stream -> dynamic table -> continuous query(state) -> dynamic table -> stream
stream 被转换成 dynamic table
对动态表进行持续查询并产生应新的动态表
生成的动态表被转换回stream
查询的限制
状态的大小
连续查询是在无界流上计算的,通常需要运行数周或数月。
因此,连续查询处理的数据总量可能非常大。
必须更新以前发出的结果的查询需要维护所有发出的行,以便能够更新它们。
因此,连续查询处理的数据总量可能非常大。
必须更新以前发出的结果的查询需要维护所有发出的行,以便能够更新它们。
例如我们又一个网站浏览数据,需要统计每个用户访问url的数量,那么我们就需要记录每个用户url的技术,
以便在输入表接收到新行时能够增加计数并发送新的结果。
如果只跟踪注册用户(本身注册用户数不多的情况下),则要维护的计数数可能不会太高
以便在输入表接收到新行时能够增加计数并发送新的结果。
如果只跟踪注册用户(本身注册用户数不多的情况下),则要维护的计数数可能不会太高
计算更新
有些查询需要重新计算和更新大部分发出的结果行,即使只添加或更新了一条输入记录。
显然,这样的查询不适合作为连续查询执行。
显然,这样的查询不适合作为连续查询执行。
例如,我们要根据某个用户最后一次操作时间进行排序,
一旦click表接收到新行,就会更新用户的lastAction,并且必须计算新的排行。
但是,由于两个行不能具有相同的排序,所以所有较低的行也需要更新。
一旦click表接收到新行,就会更新用户的lastAction,并且必须计算新的排行。
但是,由于两个行不能具有相同的排序,所以所有较低的行也需要更新。
动态表到流的转换
说明
与常规数据库表一样,动态表可以通过插入、更新和删除更改不断修改。
它可能是一个具有单行(经常更新)的表,也可能是一个只有插入的表,没有更新和删除修改,或者介于两者之间的任何东西。
它可能是一个具有单行(经常更新)的表,也可能是一个只有插入的表,没有更新和删除修改,或者介于两者之间的任何东西。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。
三种编码方式
Append-only stream
仅通过插入更改修改的动态表可以通过发出插入的行来转换为流。
Retract stream
retract stream包含两中类型的信息
add messages
将INSERT更改编码为add messages
将更新(新)行的UPDATE change编码为add message
retract messages
将DELETE更改编码为retract messages
将更新(旧)行的UPDATE change编码为retract messages
注意
UPDATE change 此处是分为两个消息编码的,与Upsert stream形成对比
Upsert stream
说明
转换成upsert stream需要一个唯一键(也许是复合键)
流消费操作符需要知道唯一的键属性,以便正确应用消息。
包含两种信息
upsert messages
将INSERT and UPDATE更改编码成upsert messages
delete messages
DELETE 更改编码成delete messages
与Retract stream 区别
UPDATE changes通过单个消息编码,所以比retract stream更高效
时间属性
说明
在 Table API and SQL中,基于时间的操作(window)都需要关于时间的概念及其起源的信息
因此,tables 可以提供逻辑时间属性,用于指示时间和访问表程序中相应的时间戳。
定义时间属性的时机
从DataStream创建table的时候
使用TableSource预定义tables的时候
一旦在开始时定义了时间属性,就可以将其引用为字段,并可在基于时间的操作中使用。
注意
只要时间属性没有被修改,只是从查询的一个部分转发到另一个部分,它仍然是一个有效的时间属性。
时间属性的行为类似于常规的时间戳,可以用于计算。如果在计算中使用了时间属性,那么它将被物化并成为一个常规的时间戳。
常规的时间戳与Flink的时间和水印系统不兼容,因此不能再用于基于时间的操作。
三种时间属性
Processing time
说明
Processing time允许table program根据本地机器的时间产生结果。
它是最简单的时间概念,但不提供决定论。它既不需要提取时间戳,也不需要生成水印。
定义方法
DataStream-to-Table转换的时候
处理时间属性是在模式定义期间用.proctime属性定义的。
time属性只能通过附加的逻辑字段扩展物理模式。因此,只能在模式定义的末尾定义它。
使用TableSource
processing time 属性通过实现DefinedProctimeAttribute 接口的TableSource实现
逻辑时间属性附加到由表源的返回类型定义的物理模式中。
Event time
说明
Event time 采用记录中制定的时间,可以保证即使发生无序事件或延迟时间结果也能保持一致。
同时还确保从持久存储读取记录时,table的结果可重放。
此外,事件时间允许在batch和stream 环境中对表程序使用统一的语法。
stream环境中的时间属性可以是batch处理环境中的记录的常规字段。
stream环境中的时间属性可以是batch处理环境中的记录的常规字段。
为了处理无序的事件并区分流中的准时事件和延迟事件,
Flink需要从事件中提取时间戳,并在时间上进行某种进展(所谓的水印)。
Flink需要从事件中提取时间戳,并在时间上进行某种进展(所谓的水印)。
定义的方法
DataStream-to-Table转换的时候
事件时间属性是在模式定义期间使用.rowtime属性定义的。
必须在转换的DataStream 中分配了时间戳和水印。
必须在转换的DataStream 中分配了时间戳和水印。
根据是否指定的.rowtime字段名,有两种方式定义时间属性
作为一个新的字段追加到schema
替换已经存在的字段
使用TableSource
通过实现DefinedRowtimeAttributes接口定义
getRowtimeAttributeDescriptors()方法返回用于描述时间属性的最终名称的RowtimeAttributeDescriptor列表
确保getDataStream()方法返回的DataStream与所定义的时间属性保持一致
子主题
JOIN操作
Regular Joins
常规联接是最通用的联接类型,其中联接输入的任何新记录或更改都是可见的,并且会影响整个联接结果。
但是,这个操作有一个重要的问题:它需要将连接输入的两边永远保持在Flink的状态。
因此,如果一个或两个输入表都在持续增长,那么资源使用也会无限增长。
Time-windowed Joins
带时间窗口的join是由join为此定义的
它会检查输入记录的时间属性是否在一定的时间限制内,即时间窗口。
与常规连接操作相比,这种连接只支持具有时间属性的 append-only table。
时间属性是准单次递增的,Flink可以在不影响结果正确性的情况下从其保存的状态中移除旧值(过期的状态值)。
时间属性是准单次递增的,Flink可以在不影响结果正确性的情况下从其保存的状态中移除旧值(过期的状态值)。
Join with a Temporal Table Function
子主题
子主题
Flink 状态管理和容错
状态计算
有状态计算
定义
程序在计算过程中,在Flink内部存储计算产生的中间结果,并提供给后续的Function或算子计算使用
状态数据存储
本地存储
Flink堆内内存
Flink堆外内存
第三方存储介质
RocksDB
自定义缓存系统
Flink 状态类型及应用
状态类型
定义
根据数据集是否根据key进行分区,将状态分为Keyed State 和 Operator State(Non-Keyed State)
类型
Keyed State
Keyed State 是 Operator State 的一种特例
Keyed State 实现按照key对数据集进行了分区,每个Keyed State仅针对一个Operator和Key的组合
Keyed State 可以通过Key Groups进行管理
主要用于算子的并行度发生变化时,自动重新分布Keyed State的数据
在系统运行过程中,一个Keyed算子实例可能运行一个或多个Key Groups 的keys
Operator State
Operator State 只和并行算子绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素的一部分状态数据
Operator State 支持当算子实例的并行发生变化是自动重新分配数据状态
Keyed State和Operator State 均有两种形式
托管状态(Managed State)
由Flink控制和管理状态数据,并将状态数据转换成内存Hash Tables 或RocksDB的对性存储
然后将这些状态数据通过内部的接口持久化到CheckPoints中
任务异常时,可以通过这些状态数据恢复任务
原生状态(Raw State)
由算子自己管理数据结构
在触发CheckPoint是,Flink并不知道状态数据的数据结构,只负责将数据转换成bytes数据保存到CheckPoint中
当从CheckPoints中恢复任务的时候反序列化出状态的数据结构
Managed Keyed State
ValueState[T]
与Key对应单个值的状态,比如 每个统计用户的交易次数,都需要在状态值count上在进行更新
value()
返回状态的当前值。
update(T value)
更新状态方法
ListState[T]
与Key对应元素列表的状态,状态中存放的是元素的List列表,如定义ListState[T]存储用户常访问IP
Iterable get()
获取元素
update(List<T> values)
更新元素
add(IN value)/addAll(List<T> values)
添加元素
ReducingState[T]
与key相关的数据元素单个聚合值的状态,存储经过指定ReduceFunction计算之后的指标,需要指定ReduceFunction完成状态数据的聚合
输入和输出类型必须相同
add(IN value)
添加元素
OUT get()
获取元素
AggregatingState[IN, OUT]
与key相关的数据元素单个聚合值的状态,存储经过指定AggregateFunction计算之后的指标,需要指定AggregateFunction完成状态数据的聚合
输入类型和输出类型不一定相同
add(IN value)
添加元素
OUT get()
获取元素
MapState[UK, UV]
与key对应键值对的状态,用于维护具有key-value结构类型的状态数据
put(UK key, UV value)/putAll(Map<UK, UV> map)
添加元素
UV get(UK key)
获取元素
类HaspMap API
remove(UK key)
contains(UK key)
Iterable<Map.Entry<UK, UV>> entries()
Iterable<UK> keys()
Iterable<UV> values()
Iterator<Map.Entry<UK, UV>> iterator()
Managed Operator State
说明
Operator State是一种Non-Keyed State,与并行操作算子有关
Flink中可以实现CheckpointedFunction 或 ListCheckpointed<T extends Serializable> 两个接口来定义操作Managed Operator State 的函数
CheckpointedFunction
说明
在每个算子中Managed Operator State都是以List的形式存储
算子和算子之间的状态数据是相互独立的
List 存储适合状态数据的重新分布
需要实现的方法
snapshotState(FunctionSnapshotContext context)
触发checkpoint的时候将调用此方法
这充当了到函数的挂钩,保所有状态都是通过在函数初始化时通过FunctionInitializationContext先前提供的方法公开的,或者现在通过FunctionSnapshotContext本身提供的方法公开的。
initializeState(FunctionInitializationContext context)
此方法在分布式执行期间创建并行函数实例时调用。
通常在此方法中设置其状态存储数据结构。
重分布策略
Even-split Redistribution
每个算子实例含有部分状态元素的List列表
整个状态数据是所有List列表的合计
当触发resyore/redistribution时,通过将状态数据平均分成与算子并行度相同数量的List列表
每个task实例中有一个List,其中包含零个或多个元素
Union Redistribution
每个算子实例含有所有状态元素的List列表
当触发resyore/redistribution时,每个算子都能获得完整的状态元素列表
案例
实现代码,见备注
并行度测试
并行度为1;env.setParallelism(1)
代码案例
并行度为4;env.setParallelism(1)
代码案例
ListCheckpointed<T extends Serializable>
说明
只支持List类型的状态
数据恢复时只支持Even-split Redistribution策略
需要实现的方法
snapshotState(checkpointId: Long, timestamp: Long)
获取函数的当前状态。状态必须反映此函数之前所有调用的结果。
restoreState(state: util.List[Long])
将函数或运算符的状态还原为前一个检查点的状态。
案例
实现代码,见备注
测试后续补充,报错了
StateDescriptor
说明
Flink通过创建StateDescriptor来获取相印的Sate操作类
StateDescriptor定义了状态的名称、状态中数据的类型参数、状态自定义函数
各种状态对应的实现
ValueStateDescriptor<T>
ListStateDescriptor<T>
ReducingStateDescriptor<T>
AggregatingStateDescriptor<IN, ACC, OUT>
MapStateDescriptor<UK, UV>
Stateful Function 定义
案例
完成对输入数据最大uid的获取
实现代码,见备注
State生命周期
任何类型的Keyed State都可以设定状态的生命周期(TTL),确保能够在规定的时间内及时的清理状态数据
案例
设置状态的生命周期
实现代码,见备注
TTL更新操作
setUpdateType
Disabled
TTL是禁用的。状态不会过期
OnCreateAndWrite
创建和写入时候更新TTL
OnReadAndWrite
创建、读取和写入都更新TTL
存在问题
所有的数据状态的TTL都是通过读取或者写入的时间进行更新的
如果某个状态指标一直不被使用或更新,则永远不会触发对该状态数据的清理,可能导致状态数据越来越大
解决
cleanupFullSnapshot
通过cleanupFullSnapshot设置触发STATE SNAPSHOT的时候清楚状态数据
注意
不适用于RockDB做增量Checkpointing的操作
cleanupInBackground()
cleanupIncrementally(@Nonnegative int cleanupSize,boolean runCleanupForEveryRecord)
可见性问题
返回条件
过期数据是否被清理
过期数据未被清理是否返回
setStateVisibility
ReturnExpiredIfNotCleanedUp
如果未清除过期的值,则返回该值
NeverReturnExpired
永远不要返回过期的值
Scala DataStream API直接使用状态
mapWithState
实例
filterWithState
flatMapWithState
无状态计算
定义
不会储存计算过程中计算的结果
也不会将结果用于下一步的计算过程
程序只会在当前的计算流中完成计算,计算完后就输出结果,然后接入下一条数据继续处理
使用场景
无状态计算
只要不需要使用到中间计算结果的
有状态计算
GEP(复杂事件处理)
获取符合一定规则的事件,状态计算可以将接入的事件进行存储,然后等待符合规则的事件触发
按照分、时、天统计pv、uv
需要利用状态来维护当前计算过程中产生的结果,比如范围内总次数、总用户数
机器学习的模型训练
在Stream上实现机器学习的模型训练,有状态计算可以帮助用户维护当前版本模型使用的参数
Checkpoints 和 Savepoint
Checkpoints 检查点机制
说明
Flinkz中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制
可以将统一时间点task/operator的状态数据(Keyed State、Operator State)进行全局统一快照处理
Flink在输入数据中间隔性地生成 Checkpoints barrier,将相隔时间段内的数据划分到相应的Checkpoint中
当应用出现异常时,operator可以根据上一次快照恢复所有算子之前的状态,保证数据的一致性
快照产生的过程非常轻量,高频创建且对Flink性能影响较小
Checkpoints过程中间状态一般保存在可配置环境中,通常是JobManger结点或HDFS上
配置
env.enableCheckpointing(10000)
每隔10s进行启动一个检查点
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
设置模式为:exactly_one,仅一次语义(默认)
env.getCheckpointConfig.setCheckpointTimeout(60000)
检查点必须在1分钟之内完成,或者被丢弃【checkpoint超时时间】 默认10分钟
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
确保检查点之间有1s的时间间隔【checkpoint最小间隔】
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
同一时间只允许进行一次检查点
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
设置周期性的外部检查点
将状态数据持久化到外部系统中,此时不会在任务正常挺值得时候清理检查点数据
此时可通过外部检查点对任务进行恢复
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
设置可容忍的检查点故障数,默认值为0,表示不容忍任何检查点故障。
env.getCheckpointConfig.setFailOnCheckpointingErrors() 已经弃用
Savepoint 机制
说明
Savepoint 是Checkpoint的一种特殊实现,底层也是Checkpoint实现的
Savepoint 是用户以手工命令的方式触发Checkpoints ,并将结果持久化到指定路径
在升级或维护急群众保存系统中的状态数据,避免数据丢失或失去恰好一次语义保证
Operator ID 配置
说明
如果您不手动指定id,它们将自动生成。
只要这些id不变,就可以从保存点自动恢复。
生成的id取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些id。
案例
Savepoint 状态
Savepoint 操作
说明
可以用命令行客户端来触发Savepoint 、取消具有Savepoint 的作业、从Savepoint 恢复,以及释放Savepoint
触发Savepoint
说明
当触发一个保存点时,将创建一个新的保存点目录,存储数据和元数据。
可以通过配置一个默认的目标目录或者使用触发器命令指定一个自定义目标目录
目标目录必须是JobManager和TaskManager都可以访问的位置,例如分布式文件系统上的位置。
操作
触发一个Savepoint
bin/flink savepoint :jobId [:targetDirectory]
这将触发ID:jobId作业的保存点,并返回创建的保存点的路径。
用YARN触发Savepoint
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发带触发ID:jobId和YARN应用程序ID:yarnAppId的作业的,并返回创建的Savepoint的路径。
使用Savepoint取消作业
bin/flink cancel -s [:targetDirectory] :jobId
触发ID:jobid作业的Savepoint ,并取消作业
从Savepoint中恢复任务
bin/flink run -s :savepointPath [:runArgs]
从指定Savepoint 中恢复任务
允许Non-Restored状态
某些情况下应用的算子和Savepoint中的算子状态可能不一致,可能出现无法恢复的状况
可以通过--allowNonRestoredState (short: -n) 来忽略无法匹配的问题
bin/flink run -s :savepointPath [:runArgs]
释放Savepoints数据
bin/flink savepoint -d :savepointPath
配置一个默认的保存点目标目录
state.savepoints.dir: hdfs:///flink/savepoints
状态管理器
Querable State
外部读写
flink 没有自己的source和sink,所以需要依赖外部系统
TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction 提取了两阶段提交协议的通用逻辑,基于此将 Flink 和支持事务的外部系统结合,构建端到端的 Exactly-Once 成为可能。
基于输出到文件的简单示例
beginTransaction
在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
preCommit
在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个 checkpoint 的任何后续文件写入启动一个新的事务。
commit
在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
abort
在中止阶段,我们删除临时文件。
到Mysql的Exactly-Once
思路
checkpoint每10s进行一次,用FlinkKafkaConsumer
数据处理完后进行一次预提交数据库的操作
预提交成功
进行真正的插入数据库操作
插入成功
进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中
插入失败
数据回滚,恢复到上一个cp点
预提交出错
Flink程序就会进入不断的重启中,重启的策略可以在配置中设置
checkpoint记录的还是上一次成功消费的offset,下一次的checkpoint也不会做
本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了
此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。
只有将异常数据处理完后,重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据
Spark
深入理解微批、批处理原理
领域DSL技术
Antlr4语言技术熟悉SPL语法的开发
数据建模技术
安全算法类
LSTM算法
攻击者画像技术
知识图谱技术
收藏
0 条评论
下一页
为你推荐
查看更多