flink项目总结
2023-03-27 15:16:20 1 举报
AI智能生成
对于直播平台的实时分析 需求分析,技术点等
作者其他创作
大纲/内容
采集架构
实时数据采集(行为数据)
什么是openresty
openresty是一个基于nginx与lua的高性能web平台,其内部集成了大量精良的lua库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态web应用、web服务和动态网关。
openresty的目标是让你的web服务直接跑在nginx服务内部,充分利用nginx的非阻塞I/O模型,不仅仅对http客户端请求,甚至于对远程后端诸如mysql、postgreSQL、memcached以及redis等都进行一致的高性能响应。
openresty的目标是让你的web服务直接跑在nginx服务内部,充分利用nginx的非阻塞I/O模型,不仅仅对http客户端请求,甚至于对远程后端诸如mysql、postgreSQL、memcached以及redis等都进行一致的高性能响应。
flume配置优化
tailDirSource有一个小bug,即log文件重命名后会重复采集数据,需要修改flume的源代码ReliableTaildirEventReader,使用Linux的inode作为文件的唯一标识,这样就不会重复采集数据了
kafkaChannel的优化
kafkaChannel其实是kafka的一个producer,我们可以修改kafkaChannel的配置(即producer的配置)实时优化和保证数据不丢失
优化1:配置kafkaChannel的ACK机制为-1,保证数据不丢失,而且创建kafka的topic时,副本数量为2,即数据就保存两份,数据保存2份已经很靠谱了,我们公司的卡夫卡集群配置是2个cpu,24核,48G内存,并且是SSD的磁盘,并且创建topic分区数量的时候,分区的数量为机器的数量*cpu个数*每个cpu的核数
优化2:设置kafkaChannel的压缩核失败重试次数
优化3:数据写入到kafka之前,自定义序列化器,将数据先序列化,这样可以减少网络IO的压力protobuff和kyro、protostuff,现在我们使用protobuff和protostuff其中的一种对数据进行序列化
FileBeat
什么是filebeat
filebeat是一个用于转发和集中日志数据的轻量级传送器。作为代理安装在您的服务器上,filebeat监控您指定的日志文件或位置,收集日志事件,并将它们转发发哦es或者logstash以进行索引
工作原理
当启动filebeat时,它会启动一个或多个输入,这些输入会在您为日志数据指定的位置中查找。对于filebeat定位的每个日志,filebeat都会启动一个收割机。每个harvester读取单个日志以获取新内容并将日志数据发送到libbeat,libbeat聚合事件并将聚合数据发送到您为filebeat配置的传输
需求分析
需求一:统计新老用户量
状态+布隆过滤器
方案一
按设备类型进行keyBy,然后使用BloomFilter进行判断是否为新用户。
存在问题:如果某个设备类型的用户比较多可能会出现数据倾斜,而且一个分区中会有多个组,一个组对应一个bloomfilter,那么一个分区就会有多个bloom filter,也会占用更多资源。
存在问题:如果某个设备类型的用户比较多可能会出现数据倾斜,而且一个分区中会有多个组,一个组对应一个bloomfilter,那么一个分区就会有多个bloom filter,也会占用更多资源。
方案二
按照设备id进行keyBy,然后定义一个OperatorState保存bloom filter进行判断是否为新用户,一个分区对应一个bloom filter,更加节省资源,不会出现数据倾斜。
如果数据中出现isNew为空或数据字段没有isNEW的情况,可以利用此方法解决。
如果数据中出现isNew为空或数据字段没有isNEW的情况,可以利用此方法解决。
布隆过滤器怎么实现去重
简述
布隆过滤器是一种快速判断一个元素是否存在于集合中的数据结构,它利用多个哈希函数将元素映射到一个位数组中,如果所有的哈希函数都指向已经被标记的位,那么可以认为元素已经存在于集合中。
实现
1、初始化一个位数组,所有的位都初始化为0
2、选择多个哈希函数,并将元素分别哈希得到对应的哈希值
3、将每个哈希值对应的位在数组中标记为1
4、当需要判断一个元素是否存在于集合中时,将元素进行哈希得到多个哈希值,并判断对应的位是否都被标记为1。如果所有的位都被标记为1,则可以认为元素存在于集合中;否则,可以认为元素不存在于集合中。
需要注意的是,由于布隆过滤器存在误判的可能性,因此不能完全替代传统的去重方法。可以将布隆过滤器作为一种快速判断是否需要进行传统去重的预处理方法,提高去重的效率
2、选择多个哈希函数,并将元素分别哈希得到对应的哈希值
3、将每个哈希值对应的位在数组中标记为1
4、当需要判断一个元素是否存在于集合中时,将元素进行哈希得到多个哈希值,并判断对应的位是否都被标记为1。如果所有的位都被标记为1,则可以认为元素存在于集合中;否则,可以认为元素不存在于集合中。
需要注意的是,由于布隆过滤器存在误判的可能性,因此不能完全替代传统的去重方法。可以将布隆过滤器作为一种快速判断是否需要进行传统去重的预处理方法,提高去重的效率
使用RocksDB作为StateBackend
原因
可以存储更多状态、有长窗口、key、value的可以保存更大的数据(2G),并且可以进行增量CheckPoint。
方案一
添加依赖
方案二
修改配置
使用clickHouse进行多维复杂统计
实现方案
使用click house的表引擎MergeTree,可以将同一个分区中,id相同的数据进行merge,可以保留最新的数据,可以使用这个特点实现flink+click house实现数据一致性
存在问题
写入到click house中的数据不能立即merge,需要手动optimize或后台自动合并
查询时在表名的后面加上final关键字,就只查最新的数据,但是效率变低了
查询时在表名的后面加上final关键字,就只查最新的数据,但是效率变低了
如何设计click house的表
1、可以支持维度查询(大宽表)
2、按照时间段进行查询(将时间作为表的字段并且建分区表)
3、可以统计出PV、UV(去重查询)
4、支持分区(按照时间进行分区)
5、支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6、生成一个唯一的id(在kafka中生成唯一的id,topic分区+偏移量)
7、相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)
2、按照时间段进行查询(将时间作为表的字段并且建分区表)
3、可以统计出PV、UV(去重查询)
4、支持分区(按照时间进行分区)
5、支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6、生成一个唯一的id(在kafka中生成唯一的id,topic分区+偏移量)
7、相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)
拼接自定义数据唯一id
目标
为了实现kafka到click house的数据一致性
实现方案
使用自定义KafkaDeserializationSchema,可以读取到kafka数据中的topic、partition、offset,将他们拼接成唯一的id
需求二:直播数据分析
方案一(舍弃)
1、将数据来一条就写入到Redis/mysql(延迟低、效率低、对数据库压力大)
2、再写一个job将各种明细数据写入到click house中(提交了2个job、数据重复计算)
2、再写一个job将各种明细数据写入到click house中(提交了2个job、数据重复计算)
方案二
1、将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/mysql(延迟高、效率高、对数据库压力小)
2、在同一个job中,将数据写入到click house中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出)
2、在同一个job中,将数据写入到click house中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出)
需求三:直播礼物实时分析
实现方案
关联维表的解决方案:
1、每来一条数据查一次数据库(慢、吞吐量低)
2、可以使用异步I/O(相对快,消耗资源多)
3、广播state(最快、适用于少量数据、数据可以变化的)
1、每来一条数据查一次数据库(慢、吞吐量低)
2、可以使用异步I/O(相对快,消耗资源多)
3、广播state(最快、适用于少量数据、数据可以变化的)
需求四:热门商品Top实时统计
需求
统计十分钟内,各个分类、各种事件类型的热门商品
实现方案
实现技术:为了将一段时间的数据攒起来,才能统计出topN
窗口选择:统计10分钟,每隔1分钟统计一次结果,为了得到的结果比较准确(平滑)我们使用滑动窗口。窗口的长度和滑动的步长可以根据实际情况进行调整
时间类型:为了精确的统计出用户在实际浏览、加入购物车、下单等热门商品的信息的变化情况,使用EventTime类型的窗口
窗口选择:统计10分钟,每隔1分钟统计一次结果,为了得到的结果比较准确(平滑)我们使用滑动窗口。窗口的长度和滑动的步长可以根据实际情况进行调整
时间类型:为了精确的统计出用户在实际浏览、加入购物车、下单等热门商品的信息的变化情况,使用EventTime类型的窗口
map和mapPartiton的区别
map操作是针对RDD中的每个元素进行转换操作,因此输入数据是单个元素,输出数据也是单个元素。map操作在执行时,会将输入rdd中的每个元素分别传递给map函数进行处理,并将处理结果作为新的rdd中的一个元素。
mappartition操作则是针对rdd中的每个分区进行转换操作,因此输入数据是一个分区的所有元素,输出数据也是一个分区的所有元素。mappartition操作在执行时,会将输入rdd中的每个分区分别传递给mappartition函数进行处理,并将处理结果作为新的rdd中的一个分区
因此,map操作适用于处理单个元素的转换操作,而mappartition操作适用于处理整个分区的转换操作,可以在一定程度上减少通信开销,提高计算效率。但是,mappartition操作需要注意处理结果的顺序和数据分布问题,因为一个分区的处理结果会被分发到不同的节点上进行合并。
mappartition操作则是针对rdd中的每个分区进行转换操作,因此输入数据是一个分区的所有元素,输出数据也是一个分区的所有元素。mappartition操作在执行时,会将输入rdd中的每个分区分别传递给mappartition函数进行处理,并将处理结果作为新的rdd中的一个分区
因此,map操作适用于处理单个元素的转换操作,而mappartition操作适用于处理整个分区的转换操作,可以在一定程度上减少通信开销,提高计算效率。但是,mappartition操作需要注意处理结果的顺序和数据分布问题,因为一个分区的处理结果会被分发到不同的节点上进行合并。
kafka-flink-clickhouse的端到端一致性怎么保证
1、数据写入kafka时保证幂等性
在数据写入kafka时,可以保证数据的幂等性,即相同的数据只会被写入一次,可以通过设置producer的acks参数和消息的key来保证
2、数据从kafka到flink的消费
在flink中,可以使用kafka consumer来消费kafka中的数据。flink consumer的可靠性保证可以通过以下几种方式来实现:
设置consumer的group id,确保同一个group内的consumer只有一个可以消费同一个partition
设置consumer的enable.auto.commit为false,手动提交offset
将offset存储到外部存储中,以便在consumer出现故障时,可以恢复到上一次消费的位置
设置consumer的group id,确保同一个group内的consumer只有一个可以消费同一个partition
设置consumer的enable.auto.commit为false,手动提交offset
将offset存储到外部存储中,以便在consumer出现故障时,可以恢复到上一次消费的位置
3、fllink中的数据处理
在flink中,对数据进行处理和转换,可以通过flink的Exactly-once语义来保证端到端的一致性。可以通过以下方式来实现:
设置flink的checkpoint机制,定期将state保存到外部存储中,以便在flink出现故障时,可以从上一次checkpoint恢复
使用flink的transactiionAPI,将数据写入clickhouse时,可以使用flink的transactionAPI来保证事务的一致性
设置flink的checkpoint机制,定期将state保存到外部存储中,以便在flink出现故障时,可以从上一次checkpoint恢复
使用flink的transactiionAPI,将数据写入clickhouse时,可以使用flink的transactionAPI来保证事务的一致性
4、数据写入clickhouse
在将数据写入clickhouse时,可以使用以下几种方式来保证数据的完整性和一致性:
设置clickhouse的幂等性,即同一个数据只会被写入一次,可以通过设置数据的primary key来实现
将数据写入到本地文件中,再通过clickhouse提供的数据导入工具(如clickhouse-client)来导入数据,确保数据的一致性
设置clickhouse的幂等性,即同一个数据只会被写入一次,可以通过设置数据的primary key来实现
将数据写入到本地文件中,再通过clickhouse提供的数据导入工具(如clickhouse-client)来导入数据,确保数据的一致性
定时器的功能和实现思路
概念
Event Time
事件发生的事件,即数据本身携带的时间戳,与处理时间和机器时间无关
Processing Time
处理时间,即flink处理数据的时间,与数据本身的时间戳无关
Ingestion Time
数据摄取时间,即数据进入flink系统的时间。对于批量攒起来写入到redis等组件的场景,一般使用processing time。flink提供了Time Service,可以在processing time上创建定时器,定时器可以在指定的时间点触发一个回调函数。在实现批量写入redis的场景中,可以通过time service来实现一定时间间隔内数据的批量写入。
实现
1、在flink中定义一个processingFunction,并实现器processElement和on Timer两个方法
2、在processElement方法中,将数据保存到状态中,并根据当前时间判断是否需要创建定时器
3、在on timer方法中,将状态中的数据批量写入到redis中,并清空状态
4、在flink的ExecutionEnvironment中设置time service的时间类型,即processing time
5、在DataStream中使用process方法,将processFunction传入,即可实现数据批量写入redis的功能。
需要注意的是,定时器的触发事件是不准确的,可能存在一定的延迟。在实际应用中,需要根据具体的场景和需求来设置定时器的时间间隔,以保证数据的实时性和准确性。
2、在processElement方法中,将数据保存到状态中,并根据当前时间判断是否需要创建定时器
3、在on timer方法中,将状态中的数据批量写入到redis中,并清空状态
4、在flink的ExecutionEnvironment中设置time service的时间类型,即processing time
5、在DataStream中使用process方法,将processFunction传入,即可实现数据批量写入redis的功能。
需要注意的是,定时器的触发事件是不准确的,可能存在一定的延迟。在实际应用中,需要根据具体的场景和需求来设置定时器的时间间隔,以保证数据的实时性和准确性。
流式计算分组统计的实现思路
1、将输入数据按照key进行分组,可以利用flink的keyBy算子
2、对于每个分组,使用flink中的窗口算子来定义时间窗口
3、在窗口内,对数据进行聚合操作,例如对每个分组的数据进行求和、求平均等操作
4、将聚合结果写入到外部存储系统(例如redis、mysql等),可以使用flink中的sink算子
2、对于每个分组,使用flink中的窗口算子来定义时间窗口
3、在窗口内,对数据进行聚合操作,例如对每个分组的数据进行求和、求平均等操作
4、将聚合结果写入到外部存储系统(例如redis、mysql等),可以使用flink中的sink算子
经纬度坐标转地理位置信息实现思路
1、获取经纬度坐标,可使用flink中的数据源(例如kafka、rocketmq等)
2、将经纬度坐标发送给第三方的地理位置服务API(GeoCodingAPI),获取地理位置信息,可以使用flink中的异步I/O
3、将地理位置信息写入到外部存储系统(例如Redis、Mysql等),可以使用flink中的sink算子
2、将经纬度坐标发送给第三方的地理位置服务API(GeoCodingAPI),获取地理位置信息,可以使用flink中的异步I/O
3、将地理位置信息写入到外部存储系统(例如Redis、Mysql等),可以使用flink中的sink算子
实时任务怎么部署,资源怎么分配?
部署方式
实时任务通常需要部署在分布式的集群环境中,以保证高可用性和可扩展性。可以选择容器技术(例如Docker、Kubernetes等)来部署和管理实时任务,以便更好地控制资源和扩展性。
资源分配
资源分配需要根据实时任务的计算复杂度、数据量和实时性要求等因素进行综合考虑。在flink中,可以通过以下方式来控制资源分配:
并行度设置
flink中的每个算子都可以设置并行度(即任务并行执行的并发度),可以根据实际情况调整并行度来控制资源消耗
算子链划分
flink会根据算子的依赖关系自动划分算子链,可以通过手动设置算子之间的边界来控制资源消耗
状态后端选择
flink中的状态可以存储在内存、文件系统或者外部存储系统(例如RocksDB、HDFS、Redis等)中,可以根据数据量和实时性要求选择不同的状态后端来提高性能。
监控和调优
在实时任务运行过程中,需要进行监控和调优,以及及时发现和解决问题。可以使用flink自带的Dashboard和TaskManager日志来进行监控,也可以使用第三方的监控工具(例如Prometheus、Grafana等)来进行可视化监控和调优
0 条评论
下一页