kafka (消息队列)
2023-11-27 13:55:36 11 举报
AI智能生成
kafka相关的概念和操作
作者其他创作
大纲/内容
消费端口的提交机制
__consumer_offsets中指针移动过程
消费提交结果
消息已提交
知道这条消息消费端已经消费完了,下次消费端重启,继续发送下一条消息给它
消息未提交
未超过保留时间(retention time)
下次消费端重启时再次发送这条未提交的消息给消费者
已超过保留时间(retention time)
消息本身仍然存在于broker上,但无法直接获取。只有在消费者请求拉取消息时,它们才能访问到这些超过保留时间的消息
消费端消息提交方式
参数enable_auto_commit
是否启用自动提交机制,默认是开启的
参数auto_commit_interval
执行自动提交的时间间隔,默认5秒
测试
默认自动提交
消费端程序重启后,之前已经消费过的消息是否会再次收到
结果是不会收到此前曾经处理过的消息
测试不提交
spring.kafka.listener.ack-mode用于配置Spring Kafka消费者监听器的ACK模式
record模式
在每条消息成功处理后自动提交偏移量
batch模式
在一批消息全部成功处理后自动提交偏移量
time模式
定期自动提交偏移量
count模式
在达到指定的记录数后自动提交偏移量
manual模式
手动提交偏移量
显式调用`Acknowledgment`对象的`acknowledge()`方法来提交偏移量,确保在适当的时候进行偏移量的提交
manual_immediate模式
手动提交偏移量,并立即提交
显式调用`Acknowledgment`对象的`acknowledge()`方法来提交偏移量,但在处理消息后立即提交偏移量
评价
手动提交:一个消息执行一次提交,更准确
自动提交:在一定的时间间隔之后,执行提交,存在一定的风险
例如:在时间间隔内,消费者宕机、网络不稳定等原因导致某些消息没有提交,进而导致消息重复投递
异常状态
在接收消息的过程中抛出异常而没有手动提交偏移量时,Kafka会将该消息视为未被成功处理,并尝试重新投递给消费者
Kafka会根据一定的重试策略来决定重新投递消息的次数
max.poll.retries
指定了在发生可恢复异常时,Kafka将尝试重新投递消息的最大次数,默认为10次
retry.backoff.ms
指定了两次重试之间的退避时间间隔,即每次重试失败后等待的时间,默认为100毫秒
等参数控制
如果在所有的重试尝试都失败后,消息仍然未被成功处理,那么这条消息将被视为无法恢复的错误消息,并且可能进入到死信队列等特殊处理机制中,具体取决于您的应用程序的配置
死信主题
借助死信主题机制来接收那些处理失败的消息,做一些后续的善后处理
注解
①@RetryableTopic注解
attempts
失败后重试次数
3
backoff
详情在@Backoff注解设置
@Backoff
autoCreateTopics
设置是否自动创建死信主题死信主题会自动在原主题名称后附加“-dlt”
true
②@Backoff注解
代码
监听死信主题
查看死信主题
kafka-topics.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --list
削峰限流
设置@KafkaListener注解的concurrency属性
Offset机制
kafka保存消息
给生产者返回ACK确认信息
消息持久化保存到硬盘上的日志文件中
Topic+Partition+Offset组合在一起可以唯一定位到一条具体的消息
日志文件(6个字段)
1.分区号(Partition)
表示消息所属的分区
2.主题(Topic)
表示消息所属的主题
3.消息的偏移量(Offset)
表示消息在分区中的唯一位置
4.消息的时间戳(Timestamp)
表示消息的发送时间或其他自定义时间戳属性
5.消息的键(Key)
可选项,某些情况下用于消息路由和分发
6.消息的值(Value)
消息的实际内容
消息举例
分支主题
__consumer_offset
数据结构
Topic
表示消费者组订阅的主题
Partition
表示消费者订阅的分区
Consumer group
消费者组的标识
分为两种
组内竞争(只一个能获取到消息)
组间共享(都会接收到消息)
Offset
消费者组再该分区中的当前消费进度,指示已经消费的偏移量
Metadata
其他与偏移量相关的元数据信息
HW(High Watermark)
每个分区的高水位线,表示已经被确认的最大偏移量
LEO(Log End Offset)
每个分区的日志结束偏移量,即下一条即将被写入的消息的位置
通过Topic和Partition的组合,定位到一个具体的分区
在__consumer_offsets主题中记录各个Partition中的消息对消费者来说消费到了哪一条
kafka-console-consumer.sh
--offset earliest
本次从Offset最小值开始消费,有可能导致消息重复投递
--offset latest
本次从Offset最小值开始消费,有可能导致消息重复投递
--offset 非负整数
本次从非负整数指定的位置开始消费
旧版本
--to-earliest
本次从Offset最小值开始消费,有可能导致消息重复投递
--to-latest
本次从最新的消息开始消费,而不会考虑之前已经消费的消息,可能会导致消费者跳过一些历史消息
urrent意思是当前,这里指当前已经提交的最新Offset
--to-current
current意思是当前,这里指当前已经提交的最新Offset
--to-offset 非负整数值
本次从非负整数指定的位置开始消费
--execute
上面的参数只是设定方案,必须再带上--execute才表示执行方案
命令行测试
--offset earliest
--offset latest
--offset 非负整数
消费者组查看偏移量
客户端原生API
生产者
创建主题
kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic topic-java-client
启动消费者监听主题
kafka-console-consumer.sh --bootstrap-server 192.168.200.100:9092 --topic topic-java-client
引入依赖
java程序
send()方法返回Future类型的对象
可以调用Future的get()方法同步获取任务执行结果
代码示例
获取消息发送结果
消费者
kafka整合springboot
流程
生产者
pom
配置文件
启动类
配置类
运行后查看主题
发送消息
linux上监听
java发送消息代码
消费者
pom
配置文件
group-id必须指定(一个具体的consumer是存在一个组里面)
启动类
配置类
接收消息--接收不到就删除对应的Zookeeper下__consumer_offsets
deleteall /brokers/topics/__consumer_offsets
传递实体对象类型的消息
实体类
发送消息的方法
会出现的异常(java.lang.ClassCastException)
原因:目前使用的序列化器是StringSerializer,不支持非字符串序列化
解决办法:把序列化器换成支持复杂类型的
生产者分区策略
指定分区(以分区为准)
send方法中,Integer partition(分区)传递的值,方法中的第二个参数,必须传递key(即便是空),第二个参数才是partition
没有parttion的话(不指定分区),传递的key起作用
key值会首先基于murmur2hash算法,得到HASH值,然后HASH值对分区数量取模
分区和key都不指定
Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区
待该分区的batch(默认16k)已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
轮询分区器
轮询分区器:轮询
spring.kafka.producer下面:
properties:
partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner
测试代码
自定义分区器
实现自定义逻辑
spring.kafka.producer下面:
properties:
partitioner.class: com.atguigu.kafka.partitioner.AtguiguPartitioner
添加配置自定义类
测试
生产者拦截器
实现接口
mplements ProducerInterceptor<String,Object>
泛型说明
- K:发送消息时指定的key
- V:发送消息时消息数据本身
重写方法
onSend()
消息发送前执行(已经调用了 KafkaTemplate 对象的 send() 方法,但是消息还没有发送到 broker)
onAcknowledgement()
Kafka服务端应答后,应答到达生产者确认回调前执行
close()
拦截器对象销毁之前执行
配置类加上
@PostConstruct注解
被修饰的方法执行时机:Web环境下Servlet对象创建完成,初始化操作之前
对被修饰方法的要求
不能有参数
不能有返回值
必须是public权限修饰符
用之前方法测试
生产者数据有序
Kafka 最多只保证单分区内的消息是有序的
所以如果要保证业务全局严格有序,就要设置 Topic 为单分区
但是,某批次的数据发送失败后,进行了重试,也可能出现后边的批次先于它到达的情况
解决方案
需要有序的消息发送到同一个分区
实现方式一:直接指定相同的partition值
实现方式二:不指定partition,而是指定相同的key
设置retries值为0
有数据丢失的风险
测试
配置文件
spring.kafka.producer下面:
retries: 0
测试同分区代码
调用 get() 方法(能保证)
并不是为了获取任务执行的结果
而是确保第一个任务执行完成之后,再执行第二个任务
如果不这样做,各个子线程负责发送消息没办法保证发送消息的顺序
测试不同分区代码
无法保证有序
- 在生产者端发送消息之前,把消费端程序停止
- 把消息全部发送到broker之后,再启动消费端程序接收消息
结果是有一定顺序
生产者ACK确认
ack
为了确认消息发送结果,我们需要引入ACK确认机制
acknowledge单词的缩写
spring.kafka.producer.acks可配置值
0
生产者发送数据后就不管了,不会等待broker的ack,这个延迟最低但是存储的保证最弱。当server挂掉的时候就会丢数据
几乎不用
1
默认值
生产者会等待ack值 ,leader确认接收到消息后发送ack,不需要follower确认。但是如果leader挂掉后他不确保消息是否同步到了所有的follower中,新leader也会导致数据丢失,可靠性中等,效率中等
一般用于传输普通日志,允许丢个别数据
-1(all)
- 生产者会等所有的follower的副本受到数据后才会收到leader发出的ack,也即Leader和ISR队列里面所有Follwer应答,可靠性最高、效率最低
一般用于传输重要不能丢失的数据(例如:钱、订单、积分等),对可靠性要求比较高的场景
如果没有接收到ack,生产者端会考虑参照retries参数执行重试操作
生产者事务
提出问题
要么都发送到消息队列
要么都不发送
例子
下单成功后,把以下消息存入消息队列:
send()方法
一个事务中所有send()方法都执行成功,所有要发送的消息都存入缓存了,再一起往broker发送
分支主题
生产者端事务仅仅对消息是否全部进入缓冲区进行管理,至于消息在网络上传输过程中是否会丢失就需要借助ack和retries机制了
kafka
对应的spring.kafka.producer下的配置
retries必须大于0
acks必须设置为-1(或all)
测试代码
消息重试机制
事务只负责保证消息存入缓冲区,已经存入缓冲区的消息都是确定要发送到broker的
但是从生产者到broker需要经过网络,broker也有宕机的风险
所以事务提交并不能保证消息一定能写入broker
数据发送可靠性水平
①At Most Once(至多一次)
②At Least Once(至少一次)
③Exactly Once(精准一次)
幂等性
广义理解
个操作执行一次和执行多次对系统的影响是一样的,执行多次不会破坏数据完整性
狭义理解
不论Producer向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复
总结
在Kafka中可以通过事务、重试机制和幂等性等特性来实现类似于"Exactly Once"的效果
风险和应对方式总结
风险
应对方式
效果
业务代码出错,导致在一个逻辑整体中的消息部分发送、部分没有发送
事务
消息要么都发送
要么都不发送
网络传输过程中消息丢失
重试
多次重试保证消息发送成功
重复消息导致数据计算错误
幂等性
每次操作都针对一个具体的id执行对其它数据没有影响
broker实例宕机导致无法接收消息
集群
集群中包含多个broker实例,避免单点故障
数据分区:给数据分区配备复制分片
火山爆发、地震、海啸、火灾……
跨区域容灾
比如:东京地震了,但是我们在北京有跨区域容灾机制
kafka集群
注册到同一个Zookeeper上就代表它们是同一个集群的
Kafka通过broker.id来区分集群中的不同节点
构建集群
分支主题
端口号
配置文件
日志目录
实例01
6000
/opt/k-cluster/server6000.properties
/opt/k-cluster/log6000
实例02
7000
/opt/k-cluster/server7000.properties
/opt/k-cluster/log7000
实例03
8000
/opt/k-cluster/server8000.properties
/opt/k-cluster/log8000
创建目录
mkdir -p /opt/k-cluster/log6000
mkdir -p /opt/k-cluster/log7000
mkdir -p /opt/k-cluster/log8000
复制配置文件
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server6000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server7000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server8000.properties
修改配置文件
6000端口
7000端口
8000端口
启动集群实例
验证端口(lsof -i:端口号)
启动失败
虚拟机内存不足
增加内存
关闭一些进程
修改对应启动脚本程序中的内存大小
Zookeeper
zookeeper-server-start.sh
KAFKA_HEAP_OPTS
Kafka
kafka-server-start.sh
KAFKA_HEAP_OPTS
停止集群
使用集群
创建主题
查看主题
集群消息发送
集群消息消费
集群消息接收的时候,收不到
方案一
集群查看
方案二
第一步:把apache-zookeeper-3.9.1-bin.tar.gz上传到Linux系统/opt目录下
第二步:解压apache-zookeeper-3.9.1-bin.tar.gz文件
cd /opt
tar -zxvf apache-zookeeper-3.9.1-bin.tar.gz
第三步:运行zkCli.sh脚本文件,登录到Zookeeper服务器
/opt/apache-zookeeper-3.9.1-bin/bin/zkCli.sh
第四步:删除__consumer_offsets主题
deleteall /brokers/topics/__consumer_offsets
第五步:退出Zookeeper
quit
第六步:重启
先关闭然后重新启动Zookeeper
先关闭然后重新启动集群各实例
Kafka图形化监控Eagle
1.搜索镜像
docker search efak
2.下载镜像运行容器
使用
访问
http://192.168.200.100:8048
登陆
可视化界面查看broker实例
新建相关主题
注意:Kafka集群中broker实例的数量需要大于等于复制因子(Replication factor)
主题列表中可以看到对应的broker拥有主题,主题中可以看到对应的分区,分区中可以看到对应的消息
引入
业务逻辑(例子:下单功能)
同步:串行(多功能)
每一步都满足才有返回
响应时间长
并发压力传递
系统结构传递不足
异步:解耦
存入消息队列就能返回结果(异步处理)
添加新功能植入(应用解耦)
流量削锋
不同进程(process)之间传递消息
两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程(要做隔离)
某一个进程接受的消息太多,一下子无法处理完,对收到的消息进行排队
Apache Kafka
是一个分布式流处理平台,具有高吞吐量、低延迟和可靠性等特点
它广泛应用于实时数据处理、日志收集、消息队列等场景
日志收集
一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer
消息系统
解耦生产者和消费者、缓存消息等
用户活动跟踪
用来记录web用户或者APP用户的各种活动,如网页搜索、搜索、点击,用户数据收集然后进行用户行为分析。
运营指标
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
流式处理
比如Spark Streaming和Storm
一款基于zookeeper协调的分布式消息系统
扩展
基于JMS的产品
JMS: Java Message Service API(Java消息服务)只能在java中使用
Apache ActiveMQ:比较成熟的产品
RocketMQ:阿里巴巴产品,目前交由Apache基金会
基于AMQP的产品
AMQP:Advanced Message Queuing Protocol(高级消息队列协议) 基于协议的,可以跨语言使用
RabbitMQ:erlang语言开发,稳定性好,响应快。
消息分类
Peer-to-Peer(Queue)
一般基于Pull或者Polling 接收消息
发送到队列中的消息被一个而且只有一个接受者接受,即便有多个接受者在同一个队列中侦听到同一个消息
即支持异步“即发即弃”的消息传送方式,也支持同步请求、应答的传送方式
发布/订阅(Topic)
发布到一个主题的消息,可以被多个订阅者接收
发布、订阅可基于Push消费数据,也可以基于Pull或者Polling消费数据
解耦能力比P2P模型更强
消息队列产品
分支主题
RabbitMQ
ActiveMQ
RocketMQ
Kafka
研发团队
Rabbit(公司)
Apache(社区)
阿里(公司)
Apache(社区)
开发语言
Erlang
Java
Java
Scala&Java
核心机制
基于AMQW的消息队列模型使用生产者-消费者模式
将消息发布到队列中,然后被消费者订阅和处理
基于JMS的消息传递模型支持点对点模型和发布-订阅模型
分布式的消息队列模型采用主题(Topic)和标签(Tag)的方式
进行消息的分类和过滤
分布式流平台,通过发布-订阅模型进行高吞吐量的消息处理
协议支持
XMPP
STOMP
SMTP
XMPP
STOMP
OpenWireREST
自定义协议
自定义协议社区封装了HTTP协议支持
客户端支持语言
官方支持Erlang,Java,Ruby等社区产出多种API,几乎支持所有语言
JavaC/C++PythonPHPPerl.NET等
JavaC++不成熟
官方支持Java社区产出多种API,如PHP,Python等
可用性
镜像队列
主从复制
主从复制
分区和副本
单机吞吐量
每秒十万左右级别
每秒数万级
每秒十万+级(双十一)
每秒百万级
消息延迟
微秒级
毫秒级
毫秒级
毫秒以内
消息确认
完整的消息确认机制
分支主题
内置消息表,消息保存到数据库实现持久化
分支主题
功能特性
并发能力强,性能极好,延时低,社区活跃,管理界面丰富
老牌产品成熟度高文档丰富
MQ功能比较完备扩展性佳
只支持主要的MQ功能毕竟是专门为大数据领域服务的
Kafka基本结构
Producer
生产者负责将消息发送到 Kafka 集群
Consumer
消费者负责从 Kafka 集群中拉取并消费消息
broker
Broker 是 Kafka 集群中的一个服务代理节点,可以看作是一台服务器
Kafka 集群通常由多个 Broker 组成,以实现负载均衡和容错
防止单点故障
高并发,扩容方便
Topic
对消息进行分类
生产者在发送消息的时候,需要指定发送到某个Topic,然后消息者订阅这个Topic并进行消费消息
Topic 是一个逻辑概念,Partition 是最小的存储单元,掌握着一个 Topic 的部分数据
Partition(分区)
提升性能
每个 Partition 都是一个单独的 log 文件,每条记录都以追加的形式写入
Topic是逻辑概念,而Partition是物理分组
生产者在发送消息的时候,需要指定发送到某个Topic的某个Partition,然后消息者订阅这个Topic并消费这个Partition中的消息
为了提高系统的吞吐量和可扩展性,把一个Topic的不同Partition放到多个Broker节点上,充分利用机器资源,也便于扩展Partition
ConsumerGroup(CG)
消费者组,由多个Consumer组成,每个ConsumerGroup中可以有多个consumer,每个consumer属于一个ConsumerGroup。
同一个Topic下的某一个分区只能被某个消费者组内的同一个消费者所消费,但可以被多个 consumer group 消费
Replication
为了保证数据的安全性和服务的高可用,又在Partition的基础上,引入Replica(副本)的概念
一个Partition包含多个Replica,Replica之间是一主多从的关系,有两种类型Leader Replica(领导者副本)和Follower Replica(跟随者副本)
Replica分布在不同的Broker节点上。
Leader负责接收生产者push的消息和消费者poll消费消息。Follower会实时从自己的Leader中同步数据保持同步
Leader故障时,某个Follower会上位为新的Leader。保证高可用
Offset
生产者Offset:消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。
消费者Offset:某个分区的offset情况。例如:生产者写入的offset是最新值是10,当一个Consumer开始消费时,从0消费,一直消费到了5,消费者的offset为5。
Message
kafka集群存储的消息是以topic为类别记录的,每个消息(也叫记录record)是由
In-sync Replicas(ISR)
ISR)已同步副本:表示存活且副本都已和Leader同步的的broker集合,是Leader所有replicas副本的子集。如果某个副本节点宕机,该副本就会从ISR集合中剔除
Kafka中的Broker、Topic、Consumer都会注册到zookeeper
图片
分支主题
kafka安装
kafka下载
https://kafka.apache.org/downloads
解压
cd /opt
tar -zxvf kafka_2.12-3.4.0.tgz
# 修改解压后的文件夹名称为Kafka
mv kafka_2.12-3.4.0 kafka
目录
分支主题
配置环境变量
vim /etc/profile
# 将kafka加入到系统bin命令下配置,实现处处都可以直接使用kafka命令
JAVA_HOME=/opt/jdk1.8.0_152
KAFKA_HOME=/opt/kafka
PATH=/opt/jdk1.8.0_152/bin:/opt/kafka/bin:$PATH
export JAVA_HOME KAFKA_HOME PATH
激活:source /etc/profile
配置kafka的配置文件
编辑config目录下的server.properties(记得提前备份)
listeners=PLAINTEXT://192.168.111.172:9092
advertised.listeners=PLAINTEXT://192.168.111.172:9092
# 上面IP地址为你自己Linux系统的真实IP地址(远程访问需要)
zookeeper.connect=192.168.111.172:2181
# 上面IP地址为你自己Linux系统的真实IP地址
......
示例图片:
分支主题
测试:kafka-topics.sh -version
启动Zookeeper
启动Kafka自身
子主题 9
使用
创建主题Topic
查看主题列表
查看已经创建的Topic信息(查看主题列表)
发送消息
消费消息
停止
zookeeper-server-stop.sh
kafka-server-stop.sh
验证
lsof -i:9092
lsof -i:2181
同一消费者组内竞争
1.新建主题
kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic kafka-test-group01
2.发送消息
kafka-console-producer.sh --bootstrap-server 192.168.200.100:9092 --topic kafka-test-group01
3.两个消费端接收消息
打开两个不同命令行窗口
执行命令
4.查看消费者组
图片
分支主题
不同消费者组间广播
1.新建主题
2.发送消息
3.两个消费端接收消息
组1接收
组2接收
4.查看消费者组
图片
分支主题
收藏
0 条评论
下一页