KAFKA
2021-12-26 18:14:22 3 举报
AI智能生成
思维导图
作者其他创作
大纲/内容
基础
Kafka 简介
开源、分布式、分区和复制的基于提交日志的分布式发布-订阅消息系统
接收来自不同源系统的数据,并实时将数据提供给目标系统
以 Scala 和 Java 编写,通常与大数据的实时事件流处理相关
为什么选择 Kafka
多个生产者 Multiple Producers
能够无缝处理多个生产者
多个消费者 Multiple Consumers
Kafka 专为多个消费者设计,无需相互干扰即可读取任何单个消息流
基于磁盘的持久化 Disk-Based Retention
可扩展 Scalable
高性能 High Performance
Apache Kafka 的一个关键特性是保留功能,即在一段时间内持久地存储消息。Kafka 代理
配置了主题的默认保留设置,保留消息一段时间(例如,7 天)或直到主题达到某个字节大小
(例如 1GB)。
Kafka 应用场景
信息系统 Messaging
消息代理用于将数据处理与数据生产者分离。
网站活动跟踪 Website Activity Tracking
监控 Metrics
日志收集 Log aggregation
使用 Kafka 可以将日志或事件数据抽象为消息流,从而消除对文件细节的任何依赖。
流处理 Stream processing
Kafka 的诞生
Kafka 是在 2010 年底作为一个开源项目在 GitHub 上发布的
2011 年 7 月被提议并被接受为 Apache 软件基金会孵化器项目
阿帕奇·卡夫卡于2012 年 10 月从孵化器毕业
Kafka 组件
Topics
Topics 就像一个类别/一个索引,它把消息一起组在一起
Kafka 中的消息被分类为 Topics。
Topics 还被分解为多个 Partitions。回到"提交日志"描述,分区是单个日志
生产者 Producer
将消息推送到 Kafka Topics 的流程
Producer 创建新消息
消费者 Consumer
使用来自 Kafka Topics 的消息的进程
消费者阅读信息。
消费者通过跟踪消息的偏移量来
跟踪它已经消费了哪些消息
分区 Partition
一个不可变的 Topics 消息序列,连续地附加到结构化提交日志中
分区也是 Kafka 提供冗余性和可扩展性的方式
由于 Topics 通常具有多个分区,因此不能保证整个 Topics(只需在单个分区内)的消息时间排序
Kafka broker
单个 Kafka 服务器称为一个broker
代理接收来自生产者的消息,为其分配偏移量,并将消息提交到
磁盘上的存储
它还为使用者提供服务,响应分区的提取请求,并响应已提交到磁盘的消息
消费者组 Consumer Groups
消费者作为消费者群体的一部分工作,消费者群体是一个或多个消费者共同消费一个主题。
该组确保每个分区仅由一个成员使用。
消费者可以横向扩展以使用包含大量消息的主题
如果单个使用者
失败,组的其余成员将重新平衡正在使用的分区,以接管丢失的成员。
消息和批处理
Kafka 内部的数据单位称为消息
可以把消息本身理解成一个不透明的数组
消息被成批写入 Kafka。批处理只是消息的集合,所有这些消息都是针对同一
Topics 和分区生成的。
模式 Schemas
安装
设置 kafka 的环境
安装 Java
安装 Zookeeper
Apache Kafka 使用 Zookeeper 存储有关 Kafka 群集的元数据以及使用者客户端详细信
息
安装 Kafka broker
Kafka 代理接收来自生产者的消息,并将它们存储在由唯一偏移键键的磁盘上
Kafka 代理允许使用者按 topic、分区和偏移提取消息
Kafka 代理可以通过使用 Zookeeper 直接或间
接地相互共享信息来创建 Kafka 群集
Kafka 群集只有一个代理充当控制器
创建并验证 topic
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper
localhost:2181
--replication-factor 1 --partitions 1 --topic test
测试主题生成消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic test
Test Message 1
测试使用来自主题的消息
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
localhost:2181 --topic test --from-beginning
Test Message 1
broker配置
General Broker
broker.id
每个 Kafka 代理都必须有一个整数标识符,该标识符是用来设置broker.id 配置。
默认情况下,此整数设置为 0,但可以是任何值。但在一个 Kafka cluster中必须是唯一的。
port
示例配置文件使用 TCP 端口 9092 上的侦听器启动 Kafka。这可以通过更改端口
配置参数设置为任何可用端口
zookeeper.connect
用于存储代理元数据的 Zookeeper 的位置。
hostname、主机名或 Zookeeper 服务器的 IP 地址
port,服务器的客户端端口号
/path,可选的 Zookeeper 路径,用作卡夫卡的 chroot 环境
log.dirs
Kafka 将所有消息保存到磁盘,这些日志段存储在日志目录配置
Topic 默认值
Kafka 服务器配置为创建的主题指定了许多默认配置。
num.partitions
确定创建新主题时使用的分区数,
主要是在启用自动主题创建时(这是默认设置)。
主要是在启用自动主题创建时(这是默认设置)。
log.retention.ms
默认值是在配置文件中使用 log.retention.ms
参数指定的,它被设置为 168 小时或一周。
log.retention.bytes
消息保留值的总字节数是使用
log.retention.bytes 参数设置的,它将应用于
每个分区
log.segment.bytes
日志段已达到 log.segment.bytes 参数指定的大
小(默认值为 1 GB),日志段将关闭并打开一个新
的日志段。日志段一旦关闭,就可以考虑过期。
Kafka 控制台工具
创建 Kafka 主题
创建 Kafka 主题
kafka-topics --create --zookeeper localhost:2181 --replicationfactor 1 --partitions 1 --topic test
Describe a topic
kafka-topics --zookeeper localhost:2181 --describe --topic test
Topics 列表
afka-topics --zookeeper localhost:2181 –list
更改 Topic
# change configuration
kafka-topics --zookeeper localhost:2181 --alter --topic test --
configmax.message.bytes=128000
# add a partition
kafka-topics --zookeeper localhost:2181 --alter --topic test --
partitions 2
Kafka CLI Producer
此工具用于将消息写入 topic
当消息采用基于文本的格式时,它很有用
向 topic 发送简单的字符串消息
kafka-console-producer --broker-list localhost:9092 --topic test
here is a message
用 key 发送信息
kafka-console-producer --broker-list localhost:9092 --topic testtopic \
--property parse.key=true \
--property key.separator=,
key 1, message 1
key 2, message 2
null, message 3
从文件发送消息
kafka-console-producer --broker-list localhost:9092 --topic
test_topic < file.log
Kafka CLI Consumer
从 Kafka 主题读取数据并写入标准输出(控制台)
消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic test
consume 旧message
要查看旧邮件,可以使用--from-beginning
显示 key-value 信息
kafka-console-consumer --bootstrap-server localhost:9092 --topic testtopic \
--property print.key=true \
--property key.separator=,
kafka-simple-consumer-shell
此工具允许您使用来自特定分区的消息。偏移量和副本
parition: 要从中使用的特定分区(默认为全部)
offset: 起始偏移量。使用-2 从开始使用消息,-1 从结尾使用。
max-messages: 要打印的邮件数
replica: 复制副本,默认为broker-leader(-1)
Kafka CLI Conusmer Group
列出消费者组
kafka-consumer-groups --bootstrap-server localhost:9092 --list
log-end-offset 是分区的最高偏移量
current-offset 是使用者实例最后提交的偏移量
owner 是客户端.id(如果未指定,则显示默认值)
lag 是当前消费者补偿和最高补偿之间的差值
删除消费者组
kafka-consumer-groups --bootstrap-server localhost:9092 --delete
--group octopus
Kafka生产者
生产者概述
生产者可以向一个或多个 Kafka topics 发布消息。
Kafka 生产者将记录发送到 topics。这些记录有时被称为消息。
生产者为每个 topics 选择要向哪个分区发送记录。
生产者可以循环发送记录。
生产者可以根据记录的优先级将记录发送到特定分区,从而实现优先级系统
Kafka broker还通过复制为我们提供可靠性和数据保护
生产者组件
Topic
分区
Kafka 主题被划分为多个分区
分区允许您通过在多个代理之间拆分特定主题中的数据来并行化主题。
每个分区都可以放在单独的机器上,以允许多个使用者并行地从一个主题中读取数据。
每个分区都可以放在单独的机器上,以允许多个使用者并行地从一个主题中读取数据。
序列化器
将对象转换为字节流以进行传输的过程称为序列化
broker
broker是一个无状态的 Kafka 服务器
Kafka 生产者和消费者不直接交互,而是使用 Kafka 服务器作为代理来交换消息服务。
Java 生产者 API
KafkaProducer
ProducerRecord
ProducerConfig
Java中通过org.apache.kafka.clients.admin.AdminClient操作topics
创建topic
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics)
删除topic
public DeleteTopicsResult deleteTopics(Collection<String> topics)
列表topic
public ListTopicsResult listTopics()
查询topic
public DescribeTopicsResult describeTopics(Collection<String> topicNames)
创建 Kafka 生产者
要创建 Kafka producer,首先需要设置属性,然后在 ProducerRecord 的帮助下发送消息。
Kafka 生产者的属性列表
key.serializer class:实现的键的序列化程序类
value.serializer class: 实现的值的序列化程序类
acks string:生产者要求领导者在考虑完成请求之前收到的确认数。这将控制发送的记录的持
久性。
久性。
acks=0 如果设置为零,则生产者将不等待来自服务器的任何确认
acks=1 这意味着领导者会将记录写入本地日志,但不会等待所有追随者的完全确认。
acks=all 这意味着领导者将等待所以的同步副本集来确认记录。这可确保只要至少一
个同步副本保持活动状态,记录就会丢失。这是最强的保证。这等效于 acks=-1 设
置。
bootstrap.servers list
生产者可用于缓冲等待发送到服务器的记录的内存总字节数
buffer.memory
生产者生成的所有数据的压缩类型。默认值为无(即无压缩)。
max.block.ms long
配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor()将阻
止多长时间
止多长时间
max.request.size int
请求的最大大小(字节)
request.timeout.ms int
配置控制客户机等待请求响应的最长时间
向 Kafka 发送消息
同步发送消息
消息发送,生产者等待第一条消
息的确认以发送第二条消息。
息的确认以发送第二条消息。
ProducerRecord<String, String> record
= new ProducerRecord<>("Course",
"Kafka", "India");
try { producer.send(record).get(); //line 1}
catch (Exception e)
{ e.printStackTrace(); // line2 }
= new ProducerRecord<>("Course",
"Kafka", "India");
try { producer.send(record).get(); //line 1}
catch (Exception e)
{ e.printStackTrace(); // line2 }
异步发送消息
有时我们不需要等待回复,然后发送下一条消息
为了异步发送消息并仍然处理错误方案,
生产者支持在发送记录时添加回调
生产者支持在发送记录时添加回调
producer.send
(record, new NIITProducerCallback());
(record, new NIITProducerCallback());
配置 Kafka Producer
使用属性创建生产者配置对象
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
使用我们刚才提供的设置创建生产者对象
Producer<String, String> producer = new KafkaProducer<>(props);
创建要推送到 Kafka 主题的消息
ProducerRecord<Integer, String> record = new ProducerRecord<String,
String>("topicName", “key1” “value1”);
调用发送方法
producer.send(record);
发送消息后 ,调用 close 方法
producer.close();
生产者的 maven 依赖
<artifactId>kafka-clients</artifactId>
Kafka消费者
Kafka 消费者概述
消费者和消费者组
消费者组和分区再平衡
Java 消费者 API
高级 API
每个 topic 每个分区的偏移量管理(自动读取 Zookeeper 中消费者组的最后一个
偏移量)
偏移量)
代理故障转移,以及添加或减去分区和消费者时
负载平衡
整合 API
减少依赖性
更好的安全性
Low Level API
更好地控制 Kafka 消息的过度消耗
⚫ 多次读取消息
⚫ 在一个进程中只使用 topic 中分区的一个子集
⚫ 管理事务,确保一条消息只处理一次
⚫ 在一个进程中只使用 topic 中分区的一个子集
⚫ 管理事务,确保一条消息只处理一次
更灵活的控制
⚫ 偏移不再透明
⚫ 需要处理代理自动故障转移
⚫ 添加消费者、分区和代理需要自己进行负载平衡
⚫ 需要处理代理自动故障转移
⚫ 添加消费者、分区和代理需要自己进行负载平衡
创建 Kafka 消费者
要创建 Kafka 消费者,首先需要设置属性,然后在 ConsumerRecord 的帮助下接
收消息。
KafkaConsumer<String, String> consumer
= new KafkaConsumer<>( props );
= new KafkaConsumer<>( props );
订阅 topic
subcribe()方法将 topic 列表作为参数,以订阅:
consumer.subscribe(Collections.singletonList("topicName"));
poll 循环
一旦消费者订阅了 topic,poll 循环将处理协调、分区重新平衡、心跳和数据
获取的所有细节
获取的所有细节
配置 Kafka 消费者
创建消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" +
KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CLIENT_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" +
KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CLIENT_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer);
创建消费者
KafkaConsumer<Integer,String> consumer = new KafkaConsumer<>(props);
读取消息
让消费者订阅特定 topic
consumer.subscribe(Collections.singletonList(this.topic));
获得一些新数据
ConsumerRecords<Integer,String> records = consumer.poll(100)
消费记录
for (ConsumerRecord<Integer, String>record : records)
{
System.out.println("Received message:
(" + record.key() + ", + record.value() + ") at offset
" + record.offset());
}
Kafka 消费者属性列表
监控和管理
主题操作
创建主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --
replication-factor 1 --partitions 1
replication-factor 1 --partitions 1
为主题添加分区
kafka-topics.sh --bootstrap-server localhost:2181 --alter --topic mytopic --
partitions 2
partitions 2
目前的版本 Kafka 不允许减少分区的操作, 如果那样做会导致
InvalidPartitionsException. 异常发生
删除主题
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-topic
如果想要真正将其删除,需要在 server.properties 里面加上一行配置如下:
delete.topic.enable=true
delete.topic.enable=true
列出集群中的主题
> bin/kafka-topics.sh --zookeeper localhost:2181 --list
消费者操作
列出和描述消费者群组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
可以通过将--list 更改为--describe 并添加--group 参数
来获得更多详情信息,比如偏移量。这将列出
指定消费者组正在使用的所有主题,
以及每个主题分区的偏移量
来获得更多详情信息,比如偏移量。这将列出
指定消费者组正在使用的所有主题,
以及每个主题分区的偏移量
bin/kafka-consumer-groups.sh
--bootstrap-server localhost:9092 --describe --
group my-group
--bootstrap-server localhost:9092 --describe --
group my-group
删除消费者群组
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 -delete --group
my-group --group my-other-group
my-group --group my-other-group
当要删除的消费者组不为空时,执行上面命令你将得到以下错误: GroupNotEmptyException。
手动删除一个或多个消费者组,可以使用--delete
偏移量管理
使用--delete-offsets 选项删除消费者组的偏移
量此选项同时支持一个消费者组和一个或多个主题
量此选项同时支持一个消费者组和一个或多个主题
kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --delete-offsets --
group my-group --topic my-topic-1 --topic my-topic-2
localhost:9092 --delete-offsets --
group my-group --topic my-topic-1 --topic my-topic-2
重置使用者组的偏移量,可以使用“--reset-offsets”选项。此选项一次只支持一个消费者组。它需
要定义以下范围:--all-topic 或--topic
要定义以下范围:--all-topic 或--topic
动态配置修改
覆盖主题默认配置
覆盖客户端默认配置
Kafka 客户端唯一可以覆盖的配置是生产者和消费者的配额,即允许具有指定客户端 ID 的所有客户
端在每个 broker 上每秒生产或者消费的字节数
端在每个 broker 上每秒生产或者消费的字节数
显示覆盖的配置
可以使用命令行工具 kafka-configs.sh 来检查主题或客户机的特定配置。显示覆盖的配置
(Describing Configuration Overrides)需要使用--describe 选项。
(Describing Configuration Overrides)需要使用--describe 选项。
删除覆盖的配置
可以完全删除动态配置,这将导致集群配置恢复到默认值,要删除配置覆盖,请使用--alter 命令以及
--delete-config 命令。
--delete-config 命令。
kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --
entity-name my-topic --delete-config retention.ms
包含在一个名为 kafka-config.sh 的命令行工具脚本
监控 Kafka
Kafka 在服务器和客户端都使用 Yammer Metrics 来测量系统的运行状态
查看可用指标(Metrics)的最
简单方法是启动 Java 自带的 Jconsole 工具
Jconsole 是基于Java Management Extensions的,通过 JMX 就可以查看连接到的客户端或者服务器上的所有指标
监控服务器状态
监控生产者状态
监控消费者状态
kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --
entity-name my-topic --delete-config retention.ms
entity-name my-topic --delete-config retention.ms
Kafka 默认禁用远程 JMX,但我们通过为执行 CLI 命令所在的进程中设置环境变量 JMX_PORT,或者设置标准
Java 系统属性,就可以通过编程方式启用远程 JMX
Java 系统属性,就可以通过编程方式启用远程 JMX
流处理
数据流(也称为事件流或流数据)是表示无限数据集的抽象
流处理是指对一个或多个事件流进行的处理。流处理与请求-响应和批处理
一样是一种编程范式
一样是一种编程范式
事件流模型的属性
无界性
对事件流进行排序
数据记录不可变
事件流可重播
请求-响应
批处理
Stream 处理
流处理的一些关键概念
时间
事件时间
是我们跟踪的事件发生和记录创建的时间
日志附加时间
是事件到达 Kafka 代理并存储在那里的时间
处理时间
是流处理应用程序接收事件以执行某些计算的时间
状态
本地或内部状态
只能由流处理应用程序的特定实例访问的状态
外部状态
优点是它的大小几乎是无限的,并且可以从应用程序的
多个实例甚至不同的应用程序访问它。缺点是额外的系统会带来额外的
延迟和复杂性
多个实例甚至不同的应用程序访问它。缺点是额外的系统会带来额外的
延迟和复杂性
流和表的对偶性
流包含更改的历史记录
表包含当前状态
时间窗口
窗口大小
窗口移动的频率(提前间隔)
窗口保持可更新的时间
流处理设计模式
单事件处理
流处理的最基本模式是单独地处理每个事件。这也被称为 map/filter 模式
它通常用于从流中过滤不必要的事件或转换每个事件
在这种模式中,流处理应用程序使用流中的事件,修改每个事件,然后
将事件生成到另一个流。
将事件生成到另一个流。
例如,一个应用程序从流中读取日志消息并将错误
事件写入高优先级流,将其余事件写入低优先级流。另一个例子是从流中读
取事件并将其从 JSON 修改为 Avro 的应用程序
本地处理状态
大多数流处理应用程序都关注聚合信息,尤其是时间窗口聚合
Kafka Streams: 架构概述
Building a Topology(构建拓扑)
每个 streams 应用程序至少实现和执行一个拓扑
Scaling the Topology(扩展拓扑)
Kafka Streams 通过允许在应用程序的一个实例中执行多个线程以及支持应
用程序的分布式实例之间的负载平衡来进行扩展
用程序的分布式实例之间的负载平衡来进行扩展
Streams 引擎通过将拓扑拆分为任务来并行执行拓扑
Surviving Failures(故障容错)
允许我们扩展应用程序的同一个模型也允许我们处理失败
Stream 处理用例
Customer Service(客户服务)
Internet of Things(物联网)
Fraud Detection(欺诈检测)
0 条评论
下一页