真实模拟面试流程
2024-12-09 16:26:34 0 举报
AI智能生成
自己面试的过程、真实
作者其他创作
大纲/内容
自我介绍
面试官您好,我叫xxx 入行已经三年了,前一份工作是在xxx的大数据部门担任大数据开发。刚开始进去的时候做的离线数仓的一些工作、但是后面公司准备开实时部门、我也开始学习实时的一些技术栈、往实时这方面转、期间我参与过公司的xxxApp实时架构的设计,我们的产品是xxxapp上面分很多板块、挂号的一个板块、商城一个板块、线上问诊一个板块还有其它的、我主要负责线上问诊这一个板块、做过一些日志系统的信息采集、清洗、因为我们有结构化和非结构化数据、需要统一口径。其它的核心的工作是实时指标分析,包括监测领域主题、收入主题等。通过这些主题指标分析,可以帮助在线问诊平台提供更好的服务,提高用户满意度,并确保医生和患者的体验质量等、而且我们要做到利用20%的患者病例信息治疗80%的患者。我通过公司的官网有了解到公司的业务的偏向xx领域,我对这个领域很感兴趣,希望多些了解!
公司被调
我上一家公司是在xxx,公司有接近千人的规模、我住在xxx距离公司差不多两三公里、公司大数据开了7个部门(项目部4、产品3、分析团队4、安全团队4、运营团队4人、技术部门12、数据工程10人)
数据量
一条行为数据约1k 用户和医生每天约产生100条数据 日活约200万 一天就有约200G的数据量产生、每天业务数据(视频、语音、图片)50G数据量产生、QPS3000条,每天产生2亿多条数据、kafka里面的主题约有10来个、每个主题里面约3个分区、共有30个分区、有2个副本 保存3天 预留30% 大概2T左右、FLink任务数量约有300多个、平均6个任务一个指标、约要出50多个指标、并行度是20、Ck的节点有5个、存的是分布式表、副本3个、放到Clickhouse中数据会压缩LZ4 备份 定期合并相同主键的数据,保留90 约50多T左右、服务器(128G内存 20核40线程 8T机械硬盘,2T固态硬盘) = 28台、有专门的维护人员,离线的数据存一年至少翻四倍
主题
用户活跃、在线医生、挂号预约、在线问诊、电子病例、处方开药、支付与收入、数据安全与合规性、监控域主题、性能主题、错误与异常
项目背景
xxx随着越来越智能化 ,功能逐渐完善。更好的跟踪用户的信息 ,允许医生和医疗团队实时监测患者的健康状况,实时病历数据、实验室结果和生命体征监测 ,可以更准确地做出治疗建议。 可以帮助医疗机构更有效地分配资源 ,如医生、护士和病床 ,以应对患者需求的波动 ,在某些医疗领域 ,需要满足法规和标准要求对患者数据进行实时监控和安全分析,以 确保数据隐私和合规性
技术架构
技术架构我们主要考虑的就是:数据量的大小、业务需求、行业内的经验、技术的成熟度、开发和维护成本、经济。我们的数据来源有很多地方比如制药企业、临床医学的实验数据、社交网络和我们平台产生的数据、获取到这些数据放到我们NGINX里面、业务数据我们放到数据库Mysql+微服务提供一个负载均衡的一个效果、数据采集工具用的是MaxWell和Flume、NGINX的数据通过FLume拦截器以事件分主题使用kafkaChannel、业务数据库通过MaxWell监控数据库的变化进行一个增量拉取变更的数据、数据传输工具用到了Kafka、FLume和MaxWell将数据发送到kafka里面进行一个削峰、数据计算框架用的Flink、Flink会去消费kafka的数据进行一个处理、宽表我们是设置到kafka里面提供复用性、检索分析工具用的CK、Flink处理好的数据发送的Ck里面进行一个分析、可视化工具用的是SuperSet、Ck通过数据集成到superset进行一个图的展示、计算调度的话资源调度用的是Yarn、FLink任务的提交通过Yarn提交方式、高可用调度用的ZooKeeper、监控工具用的是prometheus、通过配置prometheus的配置文件、使用grafna进行一个指标显示、他可以监控软件和硬件的所有指标。
业务指标
支付成功分析
可以帮助医疗机构更好地理解其财务状况,改进服务质量,并优化资源分配。1 初始化流环境。2.从Mysql中获取订单表,订单详情表。3.转换数据结构 转换成DataBean、筛选出订单状态为成功的订单表。4.然后根据将两个表进行一个表连接。没有连接上的和迟到的数据进行一个合并然后异步IO去Mysql查询。5.关联医院ID将医院信息表进行一个连接,然后在将患者信息进行一个连接。10.在订单详情表与商品金额进行一个连接,进行一个商品价格的补全,地理位置的一个集成,发送到kafka(复用性)。6.判断年龄不同的年龄设置不同年龄组。6.引入水位线延迟5秒。7.将患者性别,患者年龄分组,将两个字段进行一个连接。8.开窗,滚动窗口大小1分钟。9.进行一个全量聚合,在将窗口的开始时间和结束时间设置到databean里面。11.写到CH里面 然后就可以根据维度进行一个汇总数据。通过BI或者SPringBoot接口进行一个展示或使用。
用户评价
实时分析用户的评价、医院满意度越高可以提高用户忠诚度、口碑、减少流失率、增加收入等。1.初始化流环境,2.获取数据源、获取评价事件,3.先转换结构,转为DataBean。3.关联医院ID、将医院表进行一个广播根据医生的ID进行一个连表,没有关联上的异步去查询,然后补全DataBean数据的医院ID和医院名称维度。5.按照用户id和医院id和医生ID进行分组 将用户ID和医院ID和医院ID进行一个连接。6.通过状态编程统计新增人数、设计一个标记 然后获取到表里面的状态进行一个赋值,判断等于空新评价 然后设置到表dataBean里面的字段(为第一次评价的) 收集bean对象。7.引入水位线延迟5秒。8。根据医院的ID和医生ID进行一个分组。9.开窗 滚动窗口大小1分钟。10.进行一个全量聚合。将窗口的开始时间和结束事件设置到DataBean里面。11.最后输出到CH里面 通过BI或者SPringBoot接口进行一个展示或使用。
集群规划设计
管理节点:RM、HM、NN、消耗内存的分开部署、数据传输数据比较紧密的放在一起(flink&ck)、客户端尽量放在一到两台服务器上,方便外部访问、有依赖关系的尽量放到同一台服务器(hive&spark)
HR基本面试
也是由于疫情解封,在线问诊的人数逐渐的减少,部门开始逐渐裁员了。我当时也意识、即使能过第一波裁员,后面还会有的而且也是为了寻找更具挑战性和发展机会的职位、渴望在一个更好的平台上发挥我的技能和经验、考虑良久、提出了离职,刚好我朋友也在这座城市,我就想的是、刚好也换个城市生活。我也想向你推销一下我自己,我沟通能力挺强的、喜欢专研新技术 ,具有自学能力的、很容易适应新环境。我上家工资的组成:基本工资+奖金+福利+培训+工作技能+五险
你想了解本公司什么内容?
贵公司的晋升机制是什么样的?
贵公司未来的发展方向?比较近的一个目标?
贵公司公司的规模,是离线实时都在做吗?
Flink
双流Join
join
先将数据缓存在state中 当窗口触发计算时进行join操作 为了满足流处理的实时性要求(等待所有参与join的事件到达后再进行处理) 也为了实现容错性(将状态持久化到可靠存储系统中)
CoGroup
除了输出匹配的元素对以外,未能匹配的元素也会输出 谁调的方法 谁就是左表
Interval Join
基于一个上界开区间 下界闭区间 在此范围的就可以做连接 数据必须先keyby才能进行连接
FlinkCDC和FlinkCEP
FlinkCDC是一种用于捕获和处理数据库变化的技术,将数据库变化以流的形式提供给Flink应用程序
基于查询
Kafka JDBC Source 、Sqoop
批次
基于Binlog
Canal、Maxwell、Debezium
流式
Debezium
1.x痛点
一致性通过加锁保证、全局锁可能导致数据库hang住、表级锁会锁住加锁的表无法更新、只支持单并发、全量读取阶段不支持检查点
2.x
根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置
每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。
将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数
据,实现了并行读取的目标。同时在每个 Chunk 读取的时候可以单独做 CheckPoint
据,实现了并行读取的目标。同时在每个 Chunk 读取的时候可以单独做 CheckPoint
在增量部分消费从低位点到高位点的 Binlog、根据主键,对 buffer 中的数据进行修正并输出
Flink的状态
算子分为有状态和无状态、无状态根据当前输入输出结果、有状态就是需要历史数据进行一个计算得到结果。状态分托管和原始状态、托管就是Flink帮我管理、原始就是自定义 自己开辟空间进行管理。托管状态又分算子状态和按键分区状态。在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。但是用一个进程去所有状态扫描看是否过期、太耗资源、所有就出现设置失效时间、只支持处理时间。状态的存储、访问以及维护,都是由一个可插拔的组件决定的、这就是状态后端。
按key分区状态
可以通过富函数类自定义key状态、从某个角度讲、Flink所有算子都是有状态的、每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。
值状态(ValueState):只保存一个值
列表状态(ListState):与集合相似
Map状态(MapState):kv形式
归约状态(ReducingState):合并更新
聚合状态(AggregatingState):同归约、只不过更加灵活
算子状态
当前并行度的算子共享一个状态
列表状态(ListState)
联合列表状态(UnionListState)
广播状态(BroadcastState)
状态TTL
.setUpdateType():指定了什么时候更新状态失效时间
.setStateVisibility():清除操作并不是实时的,当状态过期之后可能继续存在,如果对它进行访问,能否正常读取到就是一个问题了
状态后端
1.13以前
1.13以后
恢复策略
Flink的分布式缓存
允许将外部文件或数据加载 到任务中 方便访问
通过getRumTimeContext.getCache放到每个taskManager中
通过getRumTimeContext.getDistributedCache().getFile()访问缓存中的数据
缓存数据不会自动清理 只能手动清理
使用广播变量的注意事项
适合静态表 避免广播大数据集 广播的数据需要序列化 需要性能开销、需要调大TaskManager内存、使用广播变量之前 进行充分的测试和性能评估 以便在程序中不会造成影响、数据量足够小、适应TaskManager内存 不然会导致内存不足的问题
flink的并行度有了解嘛 哪几种级别
配置文件级别
fink.conf.yml
执行环境级别
setParallism()设置在方法后面设置 默认环境内所有都是此配置
算子级别
通过setParallism()定义单个算子
代码运行时
通过在linux上面执行任务时 设置的并行度
检查点超时
原因是反压
因为CheckPoint barrier跟随普通数据一起流的 不会越过普通数据 导致端-端期间的时长边长 为了保证准确一次 多个管道就需要对齐 接收到较快的管道数据后 后面的数据就会缓存起来不处理 放到state里面 导致checkPoint变大 还有问题可能定期生成器出问题了 就是代码问题
解决办法
降低Source的并发度、拉取频率、拉取量;提高checkpoint发送频率;提高同时能够进行的checkpoint数量;提高checkpoint超时时间;启用非对齐checkpoint
内存管理
FullGC
为了处理更大数据开了很大的内存中间
OOM
超过了分配给JVM的内存大小
缓存未命中
CPU计算,从CPU缓存中读取数据
任务调度:图是由 点 和 边 组成
流图
最初的图、用来表示程序的拓扑结构
generrate类
transformations
是一个list,依次存放了 用户代码里的算子
任务图
流图经过优化成任务图、提交给JobManager的数据结构
组成部分
JobVerteX
作业中的一个或多个算子、串联形成一个任务
JobeEdge
JobVertex之间的数据流连接、还包括数据传输的方式
Parallelism
并行度
主要作用
任务的划分、依赖、并行度,故障恢复
执行图
描述和管理作业的执行计划的关键数据结构
组成部分
JobVertex(作业顶点)
ExecutionVertex(执行顶点)
ExecutionEdge(执行边缘)
主要作用
任务的划分与调度、资源分配管理、优化与调优
物理执行图
部署task后形成的图、并不是具体的数据结构/类
主要作用
为调度器、优化器和资源管理器提供关键的信息,确保作业高效、容错地执行
组件通信
部署模式
YARN
1.Session Mode
启动一个集群,保持会话。通过客户端提交作业、任务会在集群里面竞争资源、适合规模小、执行时间短的大量作业
2.Pro-Job Mode
资源隔离、每一个作业对应一个集群、实际应用的首选模式、需要借助资源框架来启动集群、作业提交到客户端
任务提交流程
3.application Mode
应用直接提交到JobManager、解决Pro-Job Mode模式下,使用客户端提交、需要保持连接大量占用网络带宽的问题。
可用参数
-d:分离模式
-jm:JobManager的内存
-nm:配置在Yarn的Web UI界面的任务名
-qu:指定的Yarn的队列名
-tm:配置每个TaskManager的内存
-p:并行度
运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么 http://hadoop102:8082-查看已经停止的 job 的统计信息
Standalone
资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理、只用在开发测试或作业非常少的场景下、只支持会话模式
系统架构
JobManager
ResourceManager
负责资源的分配和管理
Dispatcher
用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster 组件
JobMaster
负责处理单独的作业(Job)
TaskManager
TaskManager都是一个JVM进程、负责执行任务处理数据、启动后 TM会向资源管理注册 Slot 然后交给JM调用、TM可以缓存数据 可以跟其他运行同一应用的 TaskManager交换数据
核心概念
并行度
一个算子可以拆分成多个子任务(subTask),子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行、subtask的个数称为并行度、默认并行度就是核心数
算子链
并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task)、可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
任务槽
相当于把一个TM内存划分几份、slot目前仅仅用来隔离内存、不会涉及CPU的隔离、slot数量配置为机器的CPU核心数。默认情况 不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行、平均分配资源。
触发任务执行
由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为 延迟执行 或 懒执行。
KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合
keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream
minBy
一条数据的所有字段计算、替换
min
取一个字段计算、赋值
富函数类
所有的flink函数类都有Rich版本、由open、close、getRuntimeContext()方法
分流
普通方法是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。使用测输出流!
定时器
基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法、对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。
优化
JM2~4G 足够
单个 TM2~8G 足够
开发完成后,先进行压测。任务并行度给 10 以下,测试单个并行度的处理上限。然后
总 QPS/单并行度的处理能力 = 并行度、并行度*1.2 倍,富余一些资源
总 QPS/单并行度的处理能力 = 并行度、并行度*1.2 倍,富余一些资源
source端
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数
transform
如果并发较大,建议设置并行度为 2 的整数次幂
sink
根据 Sink 端的数据量及下游的服务抗压能力进
行评估
状态
Check Point
分钟级别(1 ~5 分钟)
也需要考虑时效性的要求,需要在时效性和性能之间做一个平衡,如果时效性要求高,结合 end- to-end 时长,设置秒级或毫秒级
Metrics
Meter平均值
记录此 Task 或者算子每秒接收的记录数
Histogram 直方图
1.最大值、最小值
2.记录数据处理的延迟信息
Gauge
现在TaskManager的 JVM heap内存用了多少
Count计数器
算子的接收记录总数 和 发送记录总数
kafka
零拷贝
零拷贝不是完全没有拷贝,只是省去了没必要的拷贝。一个场景:需要把磁盘的某个文件内容拷贝远程服务器。正常情况需要四步:磁盘文件读取到readBuffer、readBuffer通过CPU发送到用户缓冲区、用户缓冲区写到socketBuffer、socketBuffer复制到网卡NIC、最后到远程服务器。零拷贝的原理是把用户缓冲区和socketBuffer省掉,因为这两步需要CPU上下文切换,性能影响、直接读取到readBuffer中,通过DMA技术直接把readBuffer的数据发送到网卡NIC。操作在linux上面是sendfile()方法、在java程序是fileChannel.transferto()方法。底层也是sendfile()。
页缓存和MMAP技术
MMAP允许将文件的全部或者部分映射到进程的地址空间中、文件内容可以直接通过内存访问
页缓存将磁盘上的文件数据以页面为单位缓存在内存中、以加速文件的读取操作
MMAP用于管理消息日志文件、而页缓存是管理各种文件
避免重复消费
首先kafka中消息存储在broker中、每条消息对应一个offset、cousumer每次消费数据都会记录数据的offset到(consumer_offsets,默认对50个分区取值)中、可以手动和自动提交。自动提交默认5秒去拉取最大的偏移量(下次该消费的数据offset)然后提交。提交的间隔中kill进行进程或者宕机。就会出现重复消费。还有一种情况就是partitionBalance机制、当消费者增加和减少、主题新增分区、就会触发重新平衡(默认5分钟)导致offset自动提交失败也会出现重复消费。解决办法:1.可以用异步的方式来处理消息,缩短单个消息的消费的时长、2.调整消息处理的超时时间拉长一点、3.减少一次性从Broker上拉取数据的条数、4.使用幂等性(耗费消费资源,增加集群压力)+事务。
如何保证消息的消费顺序
消息发送到主题里面对key取余存储分区里面、消费者根据消息平衡消费指定分区、数据已经乱序了。为了保证数据的有序性、可以自定义路由策略、自己按key分区然后指定消费者消费某个分区。
ISR和OSR机制
ISR是一个队列集合 放的是Follower节点 类似于Raft算法中的候选人概念 表示有当主的能力 说明它节点正常 并且数据相差不大 如果Follower超过10秒没读取数据或者数据量与leader相差数超过4000条 都会提出ISR集合 放到OSR队列中 待该 Follower 恢复后,Follower 会 读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步数据操作 Leader故障后选举第一个恢复的node作为leader提供服务 以它的数据为基准 称为脏leader选举 其它的保证数据一致性 先截取高于HW的部分截掉 在去从新的leader同步数据
业务数据如何在kafka存储的
单表单主题
一个分区
数据主键分区
多表单主题
根据主键
根据表名的hash去传递
上下游切换
允许您在Kafka中建立一个灵活的消息管道,将数据从一个主题传递到另一个主题,并进行任何必要的数据转换和处理
kafka挂掉原因以及怎么恢复
硬件故障
恢复
软件故障
通过查看日志是否有错误消息
资源耗尽
增加硬件
数据损坏
通过备份恢复数据
监控系统
通过监控系统及时发现问题 以及解决
kafka消息数据积压 kafka消费能力不足怎么办?
如果是处理速度及时 必须 消费者数=分区数
如果是处理速度不及时 提高每批次拉取的数量
框架中kafka起到了什么作用 不用kafka有什么影响
会把消息持久到磁盘 有效避免消息丢失的风险
对生产者和消费者的一个解耦 并且中间做了一个缓冲解决了生产者与消费者处理速度不一致的问题
影响
不能保证消息的有序性
实时的消息易丢失 没有kafka做削峰 会造成数据堵塞
生产者端的Request流程,消息打批之后发出?
当batch大小达到了闻值(batch.size) 或batch积累消息的时间超过了linger.ms时batch被视为“已完成或"准备就绪”。batch.size和inger.ms都是Producer端的参数。batch.size默认值是16KB,而linger.ms默认值是0毫秒。一旦达到了batch.size值或越过了linger.ms时间,系统就会立即发送batch。
kafka的存储格式
Segment
.index 索引文件
索引 + offset数据的偏移量
顺序读写 默认申请10M的空间
每一个 .index索引文件都是存的相同的索引
稀疏性:一个区间一个索引
稠密性:一条数据都有一个索引
顺序读写 默认申请10M的空间
每一个 .index索引文件都是存的相同的索引
稀疏性:一个区间一个索引
稠密性:一条数据都有一个索引
不走内存 直接存文件
硬盘 顺序读写 比 磁盘快
所以提前申请好空间
硬盘 顺序读写 比 磁盘快
所以提前申请好空间
.log 数据文件
为上一个数据文件最后一条数据的offset值(偏移量)
文件名得长度必须达到二十位 不足得用 0 填充
文件名得长度必须达到二十位 不足得用 0 填充
.timeindex:时间戳索引 专门找index文件的
.snapshot:文件,记录了producer的事务信息
kafka的分区分配策略
RangeAssignor(默认)
以一个主题为单位 使用当前主题的分区数除以消费者数量
如果分配不均于 前面的往下多分一个区 一个组分配的区时连续的
如果分配不均于 前面的往下多分一个区 一个组分配的区时连续的
缺点:如果每个主题刚好都多一个 那么每次都是第一个消费者都会多分配一个 主题越多就会出现部分消费者过载的情况
RoundRobinASSignor
将所有消费者 以及 订阅的所有消费主题 按照字典序排序 然后轮询分配
相对均匀 差值不超过1 订阅多个主题相对数据比较混乱 适合单个主题
相对均匀 差值不超过1 订阅多个主题相对数据比较混乱 适合单个主题
Sticky粘性的
0.11新增 均匀 不变 订阅哪些主题 消费哪些 进可能保持均匀
ODPS
一体化大数据平台、开始只是一个计算引擎后面的生成一体化平台、数据的存储、调度、治理、处理、机器学习、用于离线计算的
Tunnel
提供高并发的离线数据上传下载服务
SQL
只能以表的形式存储数据
UDF
自定义函数
MR
计算引擎
Gruph
图计算
面试真题
双架构
广告组
广告端数据、投放广告用户点击广告下载游戏或者打开游戏、通过广告产生的行为、会计算一些次流、回流
第三方提供数据
doris、CK、Flink
流批一体
数据组
取得是游戏的服务端数据、用户在游戏里面产生的数据、用户的活跃、登录啊、注册
需要对比数据准确性是否一样、看是否是第三方提供数据的问题、有时候需要同时跟开发同一张表、对比结果
数据治理
java写数据质量管理的后端平台
你在你们组中担任的是什么角色
我们当时没有分的太细、像我的话前期的架构设计、大表的数据拉取(订单表、处方表、咨询的信息表)、数据宽表设计(支付成功的宽表)、任务的压测我都有接触过、我们当时人很少基本就要全链路都要会才行、不仅仅只限于数据的指标一个分析、我们组长每次就去对接需求、然后汇报进度、会有进度表的、几号开发完、几号测试等这些。因为一开始就只知道粗粒度的时间、多久开发完、我们每次汇报的是更细粒度的时间。然后组长会给我们分配任务、尽可能把任务跟任务拆成没有依赖关系的。会给我们排期、我们也会每天写进度、完不成的提前沟通、遇到业务问题多提出来解决。
接到一个需求、怎么评估这个需求的、需要接什么数据、时效性要求、达到什么样的效果
病例趋势分析
实时监控病例的趋势、分布、走向,可以在短时间病毒爆发发现并控制疫情和一系列的推荐、针对性的广告等等。我们需要实时从源头获取数据包括患者信息、咨询时间、疾病类型、诊断结果、地理位置等这些信息。1.初始化流环境、通过flink连接数据源。2.先将数据转为一个DataBean。3.将诊断结果和患者信息进行一个连接,没有连接上的和迟到的数据进行一个异步的数据库查询。4.将地理位置进行一个集成通过广播的一个地理位置维度表进行一个连接省市区、如果数据没有的、我们会去高德API查询先添加到数据库在进行一个补全。5.先根据地理位置(粒度区可以进行一个上卷)、疾病类型、医生ID、患者ID(进行一个去重)分组,进行一个聚合。6.设置水位线、根据地理位置、疾病类型分组、窗口大小1分钟、聚合,统计出每个区的每种疾病类型的数量、通过过滤器筛选出特殊的病例、新病例,流行病例、分别进行一个特殊标记、输出到CK里面。7.CK里面进行一个城市的维度上卷统计每个省份每个城市每个地区的不同类型的病例数量分别展示、在将新病例的城市标记出来随时观测新病例的走向、扩展城市和增长率、特殊病例同样也是标记出来、还有流行病例的走向城市也算出来展示、在与上一个五分钟进行一个相同地区的相同病例比较、算出病例的增长率、还可以分析诊断结果进行一个临床诊断分析,治疗建议提供给医生提出治疗不会出问题的方案。8.最后将这些数据查询出来通过可视化界面superset展示出来,也会写接口交给后台调用交给专家分析。
kafka如何避免消息丢失
kafka由Producer、broker、consumer三个端组成、只要每个端数据得到保证、数据就不会丢失。producer:1.把异步发送改成同步发送,这样producer就能实时知道发送的结果、2.添加异步回调函数来监听消息发送的结果、如果发送失败,可以在回调中重试。broker:只要数据写到磁盘、基本不会数据丢失。使用ISR副本机制+ACK机制实现消息的可靠性,ISR队列里面有一个leader+多个follower组成、follower回去同步leader的数据、主备切换也可以用。ACK机制的有个应答机制0、1、-1、-1的时候是所有副本全部同步才返回消息给producer。consumer:我认为只要前两个只要不出问题、基本上消费就不会问题
Prometheus监控是你自己调的嘛
Prometheus
Prometheus
1.启动Prometheus
prometheus --config.file=/opt/zql/prometheus-2.30.3/prometheus.yml
2.进入页面
node01:9090
3.配置数据采集
4.配置数据拉取
vim /opt/zql/prometheus-2.30.3/prometheus.yml
5.重启
6.页面
Alerts
警报
Graph
查询任务
prometheus_http_request_duration_seconds_bucket{job="任务名"}
Status--Target
节点状态
Status--Configuration
配置
Status--Rules
定义规则
Grafana设置模板
https://grafana.com/grafana/dashboards/
1.左边Data Source栏选择Prometheus
2.上方搜索 选择喜欢的模板
3.点进模板 下面有个ID 记住!
Grafana数据可视化
点击设置--dataSource--添加dataSource--prometheus
用户自定义--使用模板--输入刚才的Id--选择数据源
Spark用RDD写的还是SQL
一般在调度工具上写SQL、效率会很高
Spark的Shuffle的过程是什么样的
大数据分而治之 多个节点都跑着任务 指在数据处理过程中,需要重新分布、重组或重新组织数据,以便将数据合并或分发给不同的节点进行进一步处理 中间过程是Shuffle 一个节点任务之间产生Shuffle只会产生磁盘I/O 多个节点任务之间之间产生Shuffle会产生网络 I/O和磁盘I/O Shuffle可能会产生数据倾斜
Hash Shuffle
不要求数据有序 将数据Partition 好,优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer
Sort Shuffle
引入了MR的shuffle机制 将所有的 Task 结果写入同一个文件,并且对应生成一个索引文件 只需要根据索引找文件、分区、排序、溢写、小合并
bypass SortShuffle
先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件、根据未优化的HashShuffle差不多、只是要合并
了解过Spark源码嘛
我们公司处理的数据量比较大、Spark会有一些错误、可能需要通过源码了解真正的原因是什么
yarn集群
1.提交命令
SparkSubmit进程
SparkSubmitArguments类
正则表达式解析提交命令的参数
master、cluster
YarnClusterApplication类
Client类
yarnClient:1.提交submitApplication到
RM
2.启动AM进程
Driver线程
3.根据提交的参数启动driver线程、初始化spark上下文
8.注册成功
YarnRMClient
4.注册AM、申请资源
ThreadPool
ExecutorRunnable
6.启动Executor
driver
7.注册Executor
ExecutorEnv
MessageLoop
inBox
Executor
5.返回资源列表给AM
组件通信
spark以前是基于akka的、spark1.0X~spark2.0X中间加入的netty、后面1.6xnetty完全替换了akka、2.x完全抛弃了akka、使用netty的RPC计算机通信协议、用于分布式远程调用和服务的方法、用Netty实现的
有做过Spark相关的调优嘛
1.避免创建重复的RDD、对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据
2.尽可能复用同一个RDD、比如现在一个kv形式数据包含了单v的形式数据、我们尽可能就用kv形式的数据、减少RDD的数量
3.对多次使用的RDD进行持久化、SparkRDD原理是每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍、如果持久化我们就可以从内存或者磁盘获取、减少计算
4.尽量避免使用shuffle类算子、因为shuffle会落盘产生磁盘I/O还可能产生网络I/O、性能极差
5.预聚合操作、在每个节点本地对相同的key进行一次聚合操作、大大减少需要拉取的数据数量从而减少磁盘I/O和网络I/O
6.使用高性能算子、reduceByKey、mapPartiton、foreachPartitions处理整个分区、而不是一条一条处理
7.广播大变量、广播的变量,会保证每个Executor的内存中,只驻留一份变量副本、Executor的task共享
8.使用kryo优化性能
9.优化数据结构、java中三种对象非常吃内存、对象、字符串、集合,
学校学过哪些课程?
计算机网络技术、java、计算机组成原理、互联网程序设计、数据库系统原理
kafka偏移量是在Flink怎么存储的
将偏移量记录在 state 中,与下游的其他算子的 state 一起,经由 checkpoint 机制实现了“状
态数据的”快照统一
态数据的”快照统一
仍然可以在Kafka中保留consumer_offsets主题,以备将来使用或与其他系统共享偏移量信息
Flink实现去重
1.布隆过滤器
2.FLink状态
使用 KeyedState 或 OperatorState 来保存已知的数据。当新数据到达时,您可以检查状态以决定是否接受或拒绝该数据。
3.FlinkSQL
5.Flink CEP
定义模式和规则,以过滤和检测特定的事件序列,从而更好地处理重复事件。
Flink内存管理
1.10重大改动
进程内存
Flink内存
堆内存
框架内存
Flink框架本身使用的内存
task内存
task任务所需要的内存
堆外内存
直接内存
框架内存
task内存
网络缓冲内存
数据传输的时候需要的内存
管理内存
用于排序、哈希、缓存中间结果、RockDB的本地内存(RockDB的申请和释放由自己管理)
JVM元空间+JVM执行开销
Flink集群有哪些角色
有一个作业管理器JobManager主要协调分布式运行 调度任务协调checkpoint,协调失败任务的恢复、任务管理器Task Manager 负责执行任务处理数据 启动后 TM会向资源管理注册 Slot 然后交给JobManger调用、JobManager又包括ResourceManager负责资源的分配和管理 一个集群只有一个 分发可用的任务槽、JobMaster相当于YARN中的ApplicationMaster组件 负责处理单独的任务、Dispatcher负责启动JobMaster
Flink窗口
窗口就是无界流转为有界流进行批计算、窗口应该理解成一个桶、窗口把有流切割成有限大小多个存储桶、数据会分发到对应的桶中、当窗口到达结束时间触发计算。动态创建—当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。窗口可以基于时间和计数、窗口又分滑动、滚动、会话、全局(自定义)窗口。
窗口机制
1.首先是否分组 分组每个组达到要求才能显示 不分组只有一条主流 主流达到要求进行显示
2.windowAssigner:分配器类型-->定义如何将数据流分配到一个或多个窗口 基于时间的滑动 滚动 会话 基于计数的滚动 滑动
3.Trigger:指定触发器类型-->满足什么样的条件触发计算 触发时机 每一个窗口分配器都有一个默认的触发器
4.Evictor:数据剔除器 在触发器触发之后 窗口函数使用之前或之后 从窗口中清除元素
5.Lateness:时延设定-->是否处理迟到数据,当迟到数据达到窗口是否触发计算
6.OutPut:输出标签-->通过getSideOutput将窗口中的数据根据标签输出
7.window Funtion:窗口上处理逻辑 增量 全量聚合
2.windowAssigner:分配器类型-->定义如何将数据流分配到一个或多个窗口 基于时间的滑动 滚动 会话 基于计数的滚动 滑动
3.Trigger:指定触发器类型-->满足什么样的条件触发计算 触发时机 每一个窗口分配器都有一个默认的触发器
4.Evictor:数据剔除器 在触发器触发之后 窗口函数使用之前或之后 从窗口中清除元素
5.Lateness:时延设定-->是否处理迟到数据,当迟到数据达到窗口是否触发计算
6.OutPut:输出标签-->通过getSideOutput将窗口中的数据根据标签输出
7.window Funtion:窗口上处理逻辑 增量 全量聚合
Flink水位线
度量事件时间的、数据流中的标记、水位线主要基于数据的时间戳生成的、表示当前事件的进展、水位线是单调递增的、周期性就是不断的发射水位线、尝试触发触发器。往往跟窗口配合、对乱序的数据正确处理。延迟并不是其它数据会进来 只是延迟你这个窗口 等你的数据进来。上游传给下游要以最小的那个作为当前任务的事件时钟。每个任务都以“处理完之前所有数据”为标准来确定自己的时钟
时间语义
数据真实产生的时间--事件时间
数据进入flink程序的时间--摄入时间
flink正在处理数据的系统时间--处理时间
Flink端到端的一致性
source端
能够进行重复消费 将偏移量记录在State中 与下游的算子一起 经过checkPoint机制 实现快照统一 我们只能说事件发生多次 我们只能反映一次给状态后端一次 有效一次
least+去重
每个算子维护一个事务日志 跟踪已处理事件 重放失败事件 在进入下一个算子之前 删除重复事件
转换
基于分布式快照算法 实现了整个数据流中各算子的状态数据快照统一
sink端
数据不能重复 采用幂等写入方式任意多次向一个系统写入相同数据,只对目标系统产生一次结果影响 结合事务和 checkPoint机制 保证只对外输出保证一次影响
预写日志
不支持事务的存储系统 使用预写日志 不能百分百精确一次 它有二次确认 当数据成功写入 会再次确认相应的检查点 需要将二次确认持久化 用于后面的故障恢复 有确认消息才能保证数据成功 缺点就是二次确认 检查点和写入都成功 确认消息失败 会导致重复写入
两阶段提交
第一阶段消费数据时 协调者向所有参与者发起是否可以执行预提交操作,等待所有参与者的响应 所有参与者执行事务操作 存储checkPoint并持久化 如果参数者事务操作执行成功 对协调者返回同意 反之 返回终止 第二阶段 协调者获取到所有消息 都是同意才发出提交请求 参与者完成操作 释放事务期间占用的资源 向协调者发送事务完成消息 协调者反馈消息 完成事务 如果一个失败或超时了发出回滚请求 参与者进行最近的checkPoint回滚 释放事务期间占用的资源 向协调者发送回滚完成消息 协调者反馈消息 取消事务
Flink的CheckPoint过程是怎么样的
CheckPoint是Flink的核心容错机制,允许系统定期地保存数据流的中间状态,并在发生故障时从这些状态恢复。Checkpoint 是分布式快照,包括了所有数据源的状态信息。通过使用Checkpoint,Flink 可以确保在任务失败后能够从先前保存的状态中继续处理数据,而不会发生数据丢失或重复处理。检查点的保存是周期性触发的,间隔时间可以进行设置。我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。如果出现故障只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。目前只有RockDB支持增量快照,可以检查耗时。对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。
分布式快照算法(Barrier对齐)
分界线对齐”操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。
通常用于有界流的情况,当你知道所有任务都处理相同数量的数据时
分布式快照算法(Barrier不对齐)
用于那种任务的处理速度以及处理的数据量不同的可以用这个
参数
最小间隔时间(minPauseBetweenCheckpoints)
最大并发检查点数量(maxConcurrentCheckpoints)
开启外部持久化存储(enableExternalizedCheckpoints)
检查点连续失败次数(tolerableCheckpointFailureNumber)
非对齐检查点(enableUnalignedCheckpoints)
对齐检查点超时时间(alignedCheckpointTimeout)
Flink反压
原理
Flink内存数据结构
内存段
在Flink内部叫MemorySegment,是Flink中最小的内存分配单元,默认大小32KB。
堆内
Byte数组
堆外
网络缓冲
内存页
是内存段之上的数据访问视图
Buffter
网络缓冲区、申请和释放由Flink自行管理、一个Buffter包装一个内存段
Buffter资源池
用来管理Buffter、每一个Task拥有自己的资源池、由工厂创建
产生的原因
1.生产速度大于消费者的速度
2.某个算子或任务处理速度较慢、可能会成为整个任务的瓶颈导致反压
3.flink任务没有足够计算资源、也会产生反压
4.任务链中有太多的算子或任务、数据需要在多个任务之间传递、延迟增加引发反压
5.窗口大小需要设置合适、如果设置不合适也会造成反压
6.数据分布不均匀、导致某个subtask处理更多的数据导致反压
排查前,先把任务链禁用,每个算子都会单独执行,而不再成为一个链式结构。方便定位
通过WebUI在BackPressure查看是否反压
查看是否数据倾斜
通过webUI定位到多个task的subTask判断是否倾斜 如果某个subTask接收数据量大于其它的 基本数据倾斜了
解决
keyBy后造成的数据倾斜
先通过状态进行一个进行一个搜集 存入状态 进行一个聚合 然后在发送下去
keyBy 之前发生数据倾斜
使用其它的重新分区函数
keyBy 后的窗口聚合操作存在数据倾斜
key拼接随机数前缀或后缀,进行keyby、开窗、聚合
使用火焰图分析查看JVM的CPU火焰图、看顶层的哪个函数占据的宽度最大
针对特定的资源调优Flink通过增加并行度或增加集群中的服务器数量来横向扩展
分析 GC 情况、通过WebUI点击Stdout可以看到日志、下载下来就可以看到单个 Flink Taskmanager 堆总大小、年轻代、老年代分配的内
存空间、Full GC 后老年代剩余大小
存空间、Full GC 后老年代剩余大小
Source 端数据读取性能比较低或者 Sink 端写入性能较差,需要检查第三方组件是否遇到瓶颈
Flink平时用到的一些算子
map一进一出、flatmap一进多出、keyBy分组、aggregate聚合、reduce聚合、union合并、connnect共享、process最多、最底层
Flink优化状态后端
使用RockDB开启增量检查点、开启本地恢复、增大 block 缓存命中率高、增大 write buffer 和 level 阈值大小(320~640MB)、增大 write buffer 数量、增大后台线程数和 write buffer 合并数、开启分区索引功能
Flink的周边生态
Gelly是一个库、用于图处理和分析、提供了一些图算法的API、用于解决复杂的图问题
FlinkML是Flink的机器学习库,用于构建和执行机器学习模型。它提供了各种算法和工具,以支持在Flink上进行机器学习任务
CEP库用于处理复杂事件序列,例如在金融领域的欺诈检测。它支持定义模式和规则以识别事件序列中的模式。
TableAPI&SQL、允许用户在Flink上使用SQL查询来操作数据流和表
Connector具有与各种数据存储和消息传递系统连接器、以便于系统集成
FlinkSQL CLI、用于Flink SQL的命令行界面,允许用户以SQL方式查询和处理数据
Flink DashBoard、一些Flink管理和监控工具,如Flink Web UI,允许用户实时监控和管理Flink作业
Flink的状态
算子分为有状态和无状态、无状态根据当前输入输出结果、有状态就是需要历史数据进行一个计算得到结果。状态分托管和原始状态、托管就是Flink帮我管理、原始就是自定义 自己开辟空间进行管理。托管状态又分算子状态和按键分区状态。在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。但是用一个进程去所有状态扫描看是否过期、太耗资源、所有就出现设置失效时间、只支持处理时间。状态的存储、访问以及维护,都是由一个可插拔的组件决定的、这就是状态后端。
按key分区状态
可以通过富函数类自定义key状态、从某个角度讲、Flink所有算子都是有状态的、每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。
值状态(ValueState):只保存一个值
列表状态(ListState):与集合相似
Map状态(MapState):kv形式
归约状态(ReducingState):合并更新
聚合状态(AggregatingState):同归约、只不过更加灵活
算子状态
当前并行度的算子共享一个状态
列表状态(ListState)
联合列表状态(UnionListState)
广播状态(BroadcastState)
状态TTL
.setUpdateType():指定了什么时候更新状态失效时间
.setStateVisibility():清除操作并不是实时的,当状态过期之后可能继续存在,如果对它进行访问,能否正常读取到就是一个问题了
状态后端
1.13以前
1.13以后
恢复策略
Flink的CheckPoint失败的原因
1.首先存储问题、因为我们检查点是存储分布式系统HDFS上的(可以存储在其它的分布式数据库里面)、如果存储系统问题那么检查点就会写入失败
2.资源不足、如果当前作业的运行所需的taskManager内存不足或者磁盘空间不足、也会导致CP无法正常运行
3.网络问题、检查点需要在不同的TM之间传输状态数据、如果网络出现了问题可能也会导致数据传输失败、检查点也会失败
4.作业逻辑问题、作业本身的逻辑问题导致无法正确处理状态保存和恢复、也会导致检查点失败
5.作业超时、如果barrier屏障堵塞了导致检查点超时、系统也会认为失败
解决的话可以通过flink的日志、以及一些监控
Flink的CheckPoint的大小
Flink的Checkpoint保存了应用程序的状态、元数据和配置信息,以确保在发生故障时能够快速且正确地恢复应用程序的状态,从而保证数据的一致性和容错性、我们检查点一般1分钟发送一次、检查点的大小约300M
Flink读取kafka的数据一般配置什么参数
获取kafka的节点、配置kafka的主题、配置分组ID、配置偏移量、配置序列化器
Flink遇到的难解决的问题
flink任务更新如何上线
保证数据不丢失和数据不重复消费
1.你得先对新任务进行测压
Flink如何做压测
主要测:吞吐量、延迟、稳定性。
1.明确定义性能指标和目标
2.数据贴合真实数据
把前几天的历史数据拿出来跑
3.如果没有达到理想状态 开始调优
4.开始测试 然后测试出吞吐量、延迟、稳定性
5.持续监控
监控工具和日志记录来及时发现和解决性能问题
6.负载测试
增加负载,模拟高负载情况下的性能表现
7.文档和报告
记录测试结果和性能优化的措施,并生成性能测试报告
2.程序中我们最好使用UID给每个算子命名、以便savepoint升级,如果没有指定,flink会自动生成(setUidHash函数,从启动日志中获取算子id的hash进行恢复)
3.找到旧任务
flink list
4.停止Job,并将状态写入Savepoint
flink cancel -s ca1f3ac0081711ee6a0767fe1fd5b31c
5.对于还在处理的很大状态的实时任务 会有影响、需要检查这次 Savepoint 目录文件是否可用
可能会有保存到HDFS上失败 任务会启动不起来
可能会有保存到HDFS上失败 任务会启动不起来
6.从指定的Savepoint启动Job
flink run -s savepoint的路径
7.界面提交直接指定savepotin的路径即可
Flink有没有遇到过内存不足的问题
连表的时候出现数据倾斜、可能需要大量的内存来存储倾斜键的数据处理任务导致内存不足。就会频繁触发垃圾回收,以释放内存。这会导致任务的处理速度下降。直到内存溢出、JVM抛出OOM,导致任务失败。flink有会尝试重新启动该任务,但是内存问题仍然存在,任务可能会在短时间内多次失败和重启,导致不稳定的状态、可能还会数据丢失等一系列问题.当时我先去Flink日志去查看的,发现抛出的异常OOM错误。我就给当前任务的内存调大。然后重新跑了下任务,后面我将静态表进行了一个广播变量也是分批广播,动态表进行一些连接。一旦分组或者连表了我就把并行度调大成2倍。加随机盐让他们落到不同分区,实现一个负载均衡
用Flink读取kafka 与 JavaAPI读取kafka有什么区别
Flink提供了内置的Kafka连接器,可以轻松集成Kafka、Java API直接读取Kafka需要自行处理很多细节,包括消息的分区、偏移量管理、容错性等
MarkDown
通常用于创建富文本文档,如网页、文档、博客文章
Java的==与equals
==用于比较两个对象的内存地址是否相同、基本类型的包装类、例如int自动缓存了-128~127、当你数据的地址值在这个范围内就是true、equals是一个用于比较对象的内容是否相同的方法
Java中怎么去避免死锁、死锁的原因是什么?
死锁是一种编程情况、死锁情况出现至少两个线程或更多资源、产生死锁的四个必要条件、只要除了互斥其它三个不满足一个就可以避免死锁、当四个条件均满足,必然会造成死锁,发生死锁的进程无法进行下去,它们所持有的资源也无法释放。这样会导致CPU的吞吐量下降。所以死锁情况是会浪费系统资源和影响计算机的使用性能的。那么,解决死锁问题就是相当有必要的了。
1、互斥: 某种资源一次只允许一个进程访问,即该资源一旦分配给某个进程,其他进程就不能再访问,直到该进程访问结束、非共享资源所必须的,不仅不能改变,还应加以保证
2、占有且等待: 一个进程本身占有资源(一种或多种),同时还有资源未得到满足,正在等待其他进程释放该资源。
解决
允许进程只获得运行初期需要的资源,便开始运行,在运行过程中逐步释放掉分配到的已经使用完毕的资源,然后再去请求新的资源
3、不可抢占: 别人已经占有了某项资源,你不能因为自己也需要该资源,就去把别人的资源抢过来。
解决
当一个已经持有了一些资源的进程在提出新的资源请求没有得到满足时,它必须释放已经保持的所有资源,待以后需要使用的时候再重新申请。这就意味着进程已占有的资源会被短暂地释放或者说是被抢占了
4、循环等待: 存在一个进程链,使得每个进程都占有下一个进程所需的至少一种资源。
解决
可以通过定义资源类型的线性顺序来预防,可将每个资源编号,当一个进程占有编号为i的资源时,那么它下一次申请资源只能申请编号大于i的资源
Java多线程中volatile 的作用
当一个变量被声明为volatile后,它保证了对该变量的读写操作都会在主内存中进行,而不是线程的本地内存。这意味着,当一个线程修改了这个变量的值,其他线程会立即看到最新的值,保证了可见性。
volatile 修饰的变量会禁止编译器和运行时环境对其进行重排序,这有助于确保代码的执行顺序符合程序员的预期。
JVM的垃圾回收过程
标记阶段
标记-清除算法: 首先标记出存活的对象,然后清除掉未标记的对象,释放内存空间。这是一种简单的算法,但可能会产生内存碎片。
复制阶段
标记-复制算法: 该算法也进行标记操作,但与标记-清除不同,它将存活对象复制到一个新的内存空间,然后清除旧空间中的垃圾。这减少了内存碎片,但会增加复制的开销。
标记-整理阶段
标记-整理算法: 与标记-清除类似,但它不是简单地清除未标记的对象,而是将存活对象整理到内存的一端,然后清除掉边界外的对象。这样也能减少内存碎片。
分代回收
新生代回收: 新创建的对象通常在新生代中,采用复制算法,将存活的对象晋升到老年代。
老年代回收: 对老年代的垃圾回收通常采用标记-整理算法。
并发垃圾回收
CMS(Concurrent Mark-Sweep): 这是一种并发垃圾回收算法,它在标记和清除垃圾的同时,允许程序继续执行。适用于需要低暂停时间的应用。
G1(Garbage-First): 与CMS类似,它将堆内存划分为多个区域,并在收集时采用类似复制算法的方式。
分区回收
ZGC: 这是一种分区回收算法,具有极低的暂停时间,并适用于大内存堆的情况。Shenandoah: 另一种具有低停顿特性的分区回收算法,可以用于大堆内存
Shenandoah: 另一种具有低停顿特性的分区回收算法,可以用于大堆内存
Hive跟传统的数据库有什么区别
Hive是构建Hadoop之上的一个数据分析工具、没有存储数据的能力,只有使用数据的能力、将HDFS的数据变成结构化化数据,这些结构化数据称为元数据存在MetaStore、HiveSQL通过Driver进行解析、编译、优化、最后执行MR来计算。
HiveSQL优化
1.谓词下推
先过滤在连接表、比如你查询的数据仅涉及到特定日期的分区,Hive 可以只加载这些分区的数据,而不是加载整个表的数据
2.投影下推
只选择查询所需的列,而不是加载整个表的数据。这可以减少 I/O 和网络开销
3.常量折叠
如果查询中包含常量表达式,在编译查询时对其进行求值,而不必在运行时执行。这减少了运行时计算的开销
4.分区裁剪
如果表使用了分区、可以确定哪些分区不需要扫描。这可以大大减少需要处理的数据量
Hive小表Join大表
先将小表读进内存 然后在map阶段进行一个表匹配、小表放不了内存的使用桶的mapJoin
先将产生数据倾斜的key单独使用mapJoin 其它的使用reduceJoin来实现 最后进行一个合并
Hive自定义函数
1.UDF映射,继承 GenericUDF 重写 initialize 初始化、evaluate 方法、getDisplayString 方法实现
2.UDTF一行对多行
继承的 GenericUDTF,重写 initialize 初始化、process 处理数据、close 关闭
3.UDAF多行变一行
先继承 AbstractGenericUDAFResolver 类重写 getEvaluator 方法,然后使用静态内部类实现GenericUDAFEvaluator 接口
hive会写msck嘛?
会写MSCK REPAIR TABLE、他用于修复表的分区目录、Hive表的分区目录需要与表中的分区保持同步,但在某些情况下,hdfsAPi写hive分区表的时候hive查询不到的数据的情况、分区目录可能会与实际的表分区不匹配,可能由于文件操作或者手动的文件移动等原因
CK经常用到的引擎
1.ReplacingMergeTree
以分区为单位去重、不在同一个分区不能去重的
2.SummingMergeTree
同一分组下的单维的预先聚合
3.AggregateMergeTree
多维的一个预先聚合
4.CollapsingMergeTree
以增代删、用一个标记来折叠删除
1行数 > -1行数:保留最后一行1数据 1行数 < -1行数:保留最后一行-1数据
1行数 = -1行数 如果 最后一行是1:保留第一行-1 和 最后一行1 如果 最后一行是-1:什么都不留
1行数 = -1行数 如果 最后一行是1:保留第一行-1 和 最后一行1 如果 最后一行是-1:什么都不留
5.versionedCollapsingMergeTree
解决了没有顺序操作 不管先写哪个都可以折叠
可以说一下CK里面两张分布式表Join的过程是什么样的嘛?
正常 JOIN,将查询发送到远程服务器。 为了创建正确的表,在每个子查询上运行子查询,并使用此表执行联接。 换句话说,在每个服务器上单独形成右表
使用时 GLOBAL ... JOIN,首先请求者服务器运行一个子查询来计算正确的表。 此临时表将传递到每个远程服务器,并使用传输的临时数据对其运行查询。
CK中Join表为什么消耗大,有了解过吗?
1.CK是分布式列式数据库、将数据水平分割成多个分区,并在不同节点上存储和处理这些分区。在进行Join操作时,需要在不同节点上协调数据的合并,这可能涉及到大量的数据传输,从而导致性能开销。
2.CK采用列式存储,以提高读取和分析性能。虽然列式存储非常适合分析查询,但在Join时,需要将多个表的列合并到一起,这可能需要对较多的列进行读取和处理。
3.在CK的分布式设置中,数据通常被复制到多个副本以提高可用性和容错性。Join操作可能需要同时处理这些副本中的数据,从而增加了计算和网络开销。
4.Join操作的性能也受到数据量和Join的复杂性影响。如果要Join的表非常大,或者Join条件非常复杂,那么操作的性能会受到挑战
有没有写CK的时候出现丢数据的情况?
1.网络问题
如果在将数据从数据源传输到 ClickHouse 过程中发生网络中断或数据包丢失,那么部分数据可能会丢失。
2.写入策略
ClickHouse 具有不同的写入引擎和策略,如 MergeTree 引擎等,它们的配置可能会影响数据写入的稳定性
3.写入失败
如果写入操作在 ClickHouse 中由于某些原因(如磁盘已满)失败,可能会导致数据丢失。
解决
监控 ClickHouse 的运行状态,特别是错误日志,以便及时发现问题并采取措施。
配置合适的 ClickHouse 写入引擎和策略。
如果你在 ClickHouse 中启用了数据复制(Replicas),那么需要确保复制配置正确,以防止数据丢失
数据库调优
熟练
1.查询性能优化、避免全表扫描
2.内存调优、分配足够的内存资源给数据库缓存,以减少磁盘I/O操作,提高性能
3.创建适当的索引以加速常见查询
4.使用分区表、表压缩等技术来改进物理结构
数仓建模
熟练
将企业数据组织成一种结构化格式,以便支持分析和决策制定、数据仓库建模的目标是创建一个有效的数据存储结构,以支持数据查询、分析和报告
一致性维度&一致性事实
一致性维度和一致性事实的使用有助于简化数据仓库的结构,减少数据冗余,并提高数据的一致性和可理解性
一致性事实
多个事实表之间共享的度量或指标
一致性维度
多个事实表之间共享的维度表
hive
底层
hive的底层就是基于hadoop的
调优
hive调优
设计好的模型事半功倍 合理的分区分桶减少数据倾斜 设置合理的MR的task数 优化存储数据块 挑选合适的数据存储方式
谓词下推 投影下推 常量折叠 分区裁剪
Join优化
小表join大表 小表join小表
先将小表读进内存 然后在map阶段进行一个表匹配
小表放不了内存
使用桶的map join
大表join大表
SMB是基于桶的map join 的有序桶
Skew Join
先将产生数据倾斜的key单独使用mapJoin 其它的使用reduceJoin来实现 最后进行一个合并
推测执行
利用更多的资源换取时间 空间换时间
并行执行
当Stage之间没有依赖关系时 我们可以进行并行执行
容器化技术
docker
用于打包应用程序及其所有依赖项成为容器镜像。这些容器镜像是独立于主机环境的,并可在任何支持Docker的主机上运行、提供了一种轻量级、可移植的方式来打包和分发应用程序
k8s
不创建容器,而是负责管理和编排已创建的容器、能够在多个主机上协调和管理容器的生命周期、提供了集群管理、负载均衡、服务发现、自动伸缩等功能,以确保容器化应用程序的高可用性和弹性
两个不同的概念但是通常结合使用、自我感觉就像altas和ranger都是数据治理的工具、一个是关注数据的生命周期、一个是关注数据的安全访问控制
使用Docker创建和打包容器镜像,然后使用Kubernetes来部署和管理这些容器。Kubernetes负责将容器分配到工作节点、监控其运行状态、自动扩展、滚动更新、管理网络和存储等
shell
经验
$n:$0 代表该脚本名称,$1-$9 代表第一到第九个参数,十以上的参数,十以上的参数需要用大括号包含,如${10}
$#:获取所有输入参数个数,常用于循环,判断参数的个数是否正确以及加强脚本的健壮性
$*:代表命令行中所有的参数,$*把所有的参数看成一个整体
$@:也代表命令行中所有的参数,不过$@把每个参数区分对待
$?:上一个命令是否正确运行 0 错误 1 正确
$#:获取所有输入参数个数,常用于循环,判断参数的个数是否正确以及加强脚本的健壮性
$*:代表命令行中所有的参数,$*把所有的参数看成一个整体
$@:也代表命令行中所有的参数,不过$@把每个参数区分对待
$?:上一个命令是否正确运行 0 错误 1 正确
if判断、for循环、case when判断、udf函数、正则
数据湖
都是构建数据湖的工具
hudi
经验
需要更强大的写入和增量处理功能,以及与 Delta Lake 格式的兼容性
专注于优化写入操作,提供了记录级别的插入、更新和删除,以支持大数据湖中的实时数据应用
支持 Delta Lake 格式,这是一种支持 ACID(原子性、一致性、隔离性和持久性)操作的开放格式
通过“COW”(Copy-On-Write)和“MOR”(Merge-On-Read)表格式来支持时间性数据和增量数据的管理
支持数据的分区,可以根据指定的列对数据进行分割
可以与 Apache Kafka、Apache Flink 等数据流处理框架集成,支持实时数据处理
lceberg
经验
更关注时间性数据和表管理,以及与开放格式的兼容性,Iceberg 可能是一个不错的选择
提供了数据表的概念,允许用户创建和管理表格化数据,以及表的模式
允许创建时间性表格,这些表可以捕获表中数据的历史变化,从而支持时间旅行查询
Iceberg 可以与多种存储后端集成,包括 Apache Hadoop HDFS、Amazon S3、Google Cloud Storage 等
Iceberg 使用 Apache Avro 格式,使其易于集成到各种大数据工具中
Iceberg 提供了对模式演进的支持,允许修改表的模式而不破坏现有数据
cdh
经验
集群自动化安装、中心化管理、集群监控、报警的一个工具、类似于一个360安全卫士、里面的360工具都是集成好了
1.明确你的需求
明确你的大数据需求,包括存储、计算、数据分析等方面、确定硬件和网络基础设施,包括服务器、存储、网络拓扑、决定使用的操作系统
2.安装和配置操作系统
3.安装java
安装所需的JDK
4.Cloudera Manager 安装
是 Cloudera 提供的大数据集群管理工具、需要安装 Cloudera Manager Server 和 Cloudera Manager Agent
5.Cloudera Manager 配置
启动 Cloudera Manager Server,访问 Web 界面进行初始配置、指定用于存储配置信息的数据库、选择 Cloudera Manager Server 和 Agent 所在的主机
6.CDH安装
通过 Cloudera Manager Web 界面选择要安装的 CDH 组件,如 Hadoop、Hive、HBase 等、Cloudera Manager 将帮助你下载和分发这些组件,并为你的集群进行配置
7.集群配置
配置 Hadoop 集群,包括 HDFS 的存储目录、YARN 资源管理器、Hive 数据仓库等
8.集群启动
使用 Cloudera Manager 启动你的 CDH 集群
9.测试和优化
运行测试作业和应用程序以确保集群正常运行。根据性能和资源需求进行集群优化
10.安全配置
配置集群的安全特性,包括身份验证、授权、加密等
11.监控和维护
设置监控和告警,以便及时检测和解决问题。定期备份和维护集群
12.数据导入
导入或上传需要处理的数据到 HDFS 或其他存储。
Yarn的调度器
Capacity:基于容量的调度器、允许管理员为不同的队列分配不同的资源、队列资源隔离
Fair:公平调度器、均衡分配资源、适用于共享集群
Fifo:所有应用程序被统一提交到一个队里中、顺序执行
维度建模在你们项目中的场景
1.维度建模原因是旨在提供用于报表、查询和数据分析的简单、易于理解的数据结构、ER建模满足不了这一点、主要的维度建模的方法主要就是选择业务、申明粒度、确定维度、确定事实。一个场景比如上面需求要看我这个每个地区诊断类型的数量这一个指标、我会先去将这个指标拆出来粒度、需要的维度、度量值、事实表类型等。2.声明粒度、粒度的话一条记录就表示一位用户的一次咨询病例类型、3.确定我的维度、根据维度的设计原则(维度属性尽量丰富 为数据使用打下基础 上游维度丰富 下游计算才会灵活、给出详实的 富有意义的文字描述、区分数值型属性 和 事实(分组的作为维度、计算的作为事实)、将尽可能多的通用的维度属性进行沉淀出来、直接把一些简单的维度放在事实表中、随着时间的流逝发生缓慢的变化的数据形成一张拉链表、为了提升效率 把常用的维度冗余到事实表) 我需要一个地区维度表根据维度的设计方法、选择维度或新建维度 维度在数仓里面要唯一、确定主维表(主维度表应该与多个事实表关联、形成一个维度建模的星座模型、不经常变化的)地区维度表应该算一个、确定相关维表(基于主维表的一些表、四川细分成都这些)、确定维度属性(维度表的字段了)。4.确定事实表、根据事实表的设计原则(尽量包含所有与业务过程相关的事实,即使存在冗余,由于事实通常是数字型,存储开销不会太大、只选择与业务过程相关的事实、分解不可加性事实为可加的组件(商品汇率=商品原价、商品优惠价格)、在选择维度和事实之前必须先声明粒度、在同一个事实表中不能有多种不同粒度的事实、事实的单位要保持一致(订单原价、订单优惠价格)、在数据库中,null 值对常用数字型字段的 SQL 过滤条件都不生效用0代替、使用退化维度提高事实表的易用性)、我现在可能就需要一张事务型的一张事实表、事务型事实表就是一旦插入不能修改、更新方式就是增量更新、当然还有累积型事实表、描述一条数据的生命周期、可能会有多个时间段、周期型、每周每月的显示。5.我们也是照阿里的一个分层方式、因为我当时看过一本阿里巴巴大数据实践一本书、一般分四层ods原始数据层、dwd数据明细层、dws宽表层、dim数据维度层、asd数据展示层、分层主要目的还是空间换时间 复杂问题简单化 减少重复开发。
kafka怎么分主题的、根据哪个字段、依据是什么
根据我们业务需求、根据里面的事件字段ID进行分主题、比如我们里面点击支付了、会有一个支付事件、可能就会分入到支付主题里面这一个情况。
需求就是现在需要本周的数据、实时需要怎么处理?
那可能我们把离线数据当成迟到数据、专门去拉取离线一周前的数据
Flink现在要读取前五分钟的数据
会进行一个开窗、用状态存储数据、用定时器进行触发、如果要连接的话可以用interval Join 进行一个上下界的连接数据、突然要的话那可能会专门跑一个任务单独重置offset用一个by-duration :把位移调整到距离当前时间指定间隔的位移处进行一个拉取数据吧
FlinkSQL的执行流程
1.SQL 和 TableAPI进入Flink之后先统一数据结构表达形式(Logical Paln)逻辑执行计划
2.Catalog会提供一些原数据信息 不是SQL本身的信息 用于后续的优化
3.Logical Paln经过一系列优化成Physical Plan物理执行计划
4.Physical Plan通过Code Generation翻译成 TransFormation
5.最后转换为JobGraph作业图 将作业图转换成物理流图 包含了所有并发执行任务
6.JobMaster向RM申请资源 将执行图分发到TaskManager上
2.Catalog会提供一些原数据信息 不是SQL本身的信息 用于后续的优化
3.Logical Paln经过一系列优化成Physical Plan物理执行计划
4.Physical Plan通过Code Generation翻译成 TransFormation
5.最后转换为JobGraph作业图 将作业图转换成物理流图 包含了所有并发执行任务
6.JobMaster向RM申请资源 将执行图分发到TaskManager上
Ck的数据存储的格式
CK是列式存储数据库、数据以表格的形式组织
HDFS存储系统跟linux的EXT4的系统分别
HDFS是一种分布式文件系统,主要用于大规模数据存储和分析,特别是在Hadoop集群上进行大数据处理、它将数据分布在多个计算节点上、数据是以块的形式存储的、也是以块的形式容错的、支持一次写入多次读取、使用日志先行确保数据的一致性。EXT4是一个本地文件系统是在单个服务器上、不是分布式的。
HDFS的namenode是做什么的
HDFS的NameNode是HDFS文件系统的管理者,它维护了文件和目录的结构以及相关的元数据信息,用于管理文件系统的命名空间和权限,并提供数据块的位置信息、在数据读取和写入的时候都要跟namenode进行一个交互、读取的话可能会去请求他告诉块的一次位置信息、写入的话也需要请求它告诉负载比较均衡并且保证块不在同一个机架上。由于它的重要性,HDFS通常采取容错和高可用性的措施,以避免NameNode成为系统的单点故障
MR的写map阶段需要继承哪一个类
需要继承一个mapper类、这是Hadoop MapReduce框架中Mapper的抽象类、需要重写一个map方法
Hive数据存储格式
textFile就是常见的文本格式、orc是列式存储格式与Hadoop环境中大多数框架兼容 性能高 读和写速度快、Parquet也是一种列式存储格式 适合嵌套数据结构 兼容性更强、Avro是一种数据序列化格式,可用于数据的存储和传输、SequenceFile是二进制文件格式,一般作为中间格式存储、可以解决小文件问题
Java中StringBuilder跟StringBuffter的区别
都是处理字符串的API、StringBuffer是线程安全的、因为它的方法都使用了锁 synchronized 修饰。这意味着在多线程环境下,多个线程可以安全地并发访问和修改同一个 StringBuffer 实例,而不会导致数据不一致的问题。StringBuilder是性能高、因为它不需要在每个方法上都获取锁。所以多个线程可以同时访问。常用的话基本都用StringBuilder、好像因为Java的编译器和JVM自动帮我们优化线程安全的问题
一个类继承了Set集合、遍历这个类、有哪些遍历方式
我知道的就是Iterator迭代器、增强for循环、forEach
java中泛型的作用
在集合中允许你创建类型安全的集合、以便更好地处理不同类型的数据,某些情况下可以提高性能、允许编译器进行更多的优化、避免没必要的类型转换、拆箱/装箱操作,明确了指定数据类型、代码的可读性也会提高,泛型也可以在编译期捕获类型错误、避免运行时出现类型不匹配的问题。
java中多线程实现的三种方式
继承Tread类创建run方法,实现Runnable接口实现run方法这种方式更灵活、避免的java单继承限制,还可以实现Callable接口和Future接口、允许线程返回结果并处理异常
java中通过JDBC连接数据库为什么完成操作之后要关闭数据流啊
为了确保程序健壮性和资源利用率、JDBC连接数据库会使用连接池、这个连接时有限的、如果不关闭会导致其它程序无法连接甚至数据库连接泄露
因为连接途中还有其它类比如ResultSet、StateMent、如果不关闭的话 会消耗系统的内存资源
还有在操作数据库时,可能会开启事务、需要调用commit提交事务、rollback回滚事务、如果不关闭、事务有可能会无法正确提交或者回滚
在有些情况、数据库会给查询或更新加锁、如果不关闭ResultSet和StateMent类、数据库一直保持锁状态、影响其它事务执行
java中main函数传入的args数组有什么作用
用于传递命令行参数、允许在java程序运行时传递一些配置信息或者数据、在某些情况下,你可能需要将数据传递给程序,而不是在代码中硬编码。命令行参数可以用于传递数据、文件名、URL、账户、密码等等、还可以在不修改程序代码的情况下,动态地改变程序的行为、
Flink调优的生产环境
我先说我们先发现问题需要调优说起吧、首先我们把Flink监控的东西全部都开了一个是End-to-EndExactly端到端的全链路追踪、还有一个就是Acquisition state delay获取状态延迟、先把这些开了 我们也可以在metrics看到它延迟的指标。像我们监控的话我们去看WebUI、我们首先去看有没有反压、那个框会变颜色、开始去排查哪个地方会产生反压(因为反压原因时生产速率大于消费速率嘛)、排查前需要先禁用掉操作链、方便排查、先看上游看是不是kafka速度太快、看是否需要加分区、下游的话看是不是CK扛不住了,需要加一个静态的流控、这是一个基本的情况。然后我们看是不是出现了数据倾斜、FLinkWebUI算子上的某一个SubTask看它的处理数据量特别多。有的话基本就出现数据倾斜了、数据倾斜的话可能会有几个通过的解决数据倾斜的方案、首先去看kafka里面是不是数据倾斜了、如果kafka数据源本身就数据倾斜了我们需要调整分区、不是的话我们就去排查flink里面的情况、flink里面分keyBy前和Key后的、如果KeyBy前的只能重新使用分区函数、如果是keyBy后造成数据倾斜了、可以先通过状态进行一个搜集 存入状态 然后在发送下午、如果是keyBy后窗口聚合造成数据倾斜了、我们会先UDF一个类找个BigKey进行一个加盐、后面在进行一个去盐。还有就是内存管理、FLink内存管理分推内跟推外、各自占多少百分比、如果一个占比比较大一个比较小的话可以把百分50%比例进行要给调整。还有就是flink任务里面提交的时候给他打上GC日志、然后通过GC工具对他进行一个GC的分析、看是不是又FUllGC、老年代卡着的话、可能会又大量的对象会进行回收。FLink还可以开启火焰图通过观察类的调用频率(a-b-c)有出现死循环的情况。一般的话基本不会有死循环的情况。还有就会分区数、一般要对应kafka的分区数、这样的话可能不会浪费资源、也不会说争抢并行度的问题。还有就是状态我们一般就是用RockDB状态、他是一个基于磁盘的存储引擎、底层使用了 LevelDB 和 Google 的 LevelDB Key-Value 存储库、它可以进行一个增量的检查点每次存储变化的部分、存储大状态。
Flink处理的数据量
我们处理的数据量平时的可能在每分钟20w左右、高峰的话能30多w左右、因为我们实时数据量不是太多、日活巅峰的时候打了300w日活量、但是我们的流失客户比较严重。
大数据体系最容易到达性能瓶颈的硬件是哪个?
kafka的话可能是磁盘和内存、Flink的话可能是CPU和内存、CK的话可能会是磁盘和CPU
Flink一个任务中事件时间窗口和处理时间窗口能并存嘛?
可以同时存在一个任务中
kafka能不与zookeeper并存嘛?
可以的、kafka在3.0版本准确的说2.8版本及以上、kafka引入了一个叫做 KRaft 的新控制器选项、替代了zookeeper简化了部署。kRaft将元数据信息存在内部主题里面。
两个客户端同时去访问HDFS同一个文件
HDFS 允许多个客户端同时打开并读取文件,因为读操作不会改变文件的内容,不会引起数据不一致、当一个客户端以写入模式打开文件时,HDFS 会以排他方式锁定该文件,不允许其他客户端同时以写入模式或追加模式打开同一个文件、多个客户端可以以追加模式打开同一个文件,这允许它们在文件末尾添加数据,而不会相互干扰。追加操作通常用于写入日志或事件数据
数据治理
原因
数据治理最根本的原因就是找数难、用数难、想找的数据不知道去哪找、特别是专业业务术语、找的时候相似表太多不知道用哪个、搜索的结果太多、需要逐一查看、搜索的结果不准、很多与业务无关、有些表命名也不规范、表的注释也太简单等等一系列问题。
如何建设
1.起步首先数据质量、数据质量主要保证数据的准确性和稳定性、准确性的话就是要确保这张的数据是没有问题的吧、符合我的要求、所以数据测试要做好。开发前可能是保证数据的准确性、开发后就要保证数据的稳定性、稳定性的话就是确保他的计算资源以及一些调度监控来保证。2.数据标准、可能会觉得数据标准在数仓的就做好了、但是像很少有最开始就做好了、都是后面慢慢的完善的、所以我觉得数据质量优先级会高、之后数据标准、数据标准可能就是命名的规范(更好的理解)、开发的规范(代码的可读性)、上线的规范(完全性)。3.当保证了数据质量、数据标准之后就是数据资产了、因为数据资产像一些就是资产管理、元数据管理这些都是让新人或者完全不熟悉的人更快速的理解业务、更快的上手、减少开发的成本、提高易用性。4.成熟阶段然后就是数据健康了、像一些计算的成本以及一些存储的成本、像一些用不到的表或者说长期用不到的表可以给他删除之类的。5.最后就是数据安全、权限的管理、生命的周期等等、像可以用一些Ranger做一些权限管理啊、或者其它的工具保证数据不被泄露。我这边可能想的是快速的解决根本问题以后在去加量的治理、如果因为业务中间阶段、或者说活下来的阶段去大规模的做治理、可能时间成本不合适、唯一能衡量数据治理团队的指标就是成本、数据治理是围绕的数据建模设计来产生的、一个不懂数据模型设计的数据治理就好像只能看表面、不能看内伤。
架构
通过数据源的hook(钩子)会将数据变动的元数据信息导入到kafka、Altas导入导出模块会将消费kafka的数据到Altas里边、Altas是不能存储数据的、只能基于其它存储库来做、图结构的数据是用图数据库JanusGraph放到存储库里面的。
WebUI
Lineage:查看血缘关系的
Schema:所选字段的血缘关系
右上角:数据资产
左边:数据字典
优化的点
元数据日志的管控和约束、dataWorks好像都没做
0 条评论
下一页