深入理解Kafka核心设计与
2022-06-12 15:30:35 0 举报
AI智能生成
Kafka原理
作者其他创作
大纲/内容
主题和分区
分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段,每个日志分段可以分为索引文件、日志存储文件和快照文件等
主题的管理
一个主题可以包含多个分区,一个分区可以有多个副本,一个副本是一个日志(LOG)文件
目前Kafka只支持增加分区不支持减少分区。
主题端参数
Topic参数/Broker参数
cleanup.policy/log.cleanup.policy:日志压缩策略。默认delete,可配compact。
compression.type/compression.type:消息的压缩类型。默认producer,保留原始压缩类型。可配uncompressed、snappy、lz4、gzip。
delete.retention.ms/log.cleaner.delete.retention.ms:被标识为删除的数据能保存多久。默认1天。
file.delete.delay.ms/log.segment.delete.delay.ms:清理文件之前可以等待多长时间,默认1分钟。
flush.messages/log.flush.interval.messages:需要收集多少消息才能强制刷新到磁盘,默认Long.MAX_VALUE。让OS决定。
flush.ms/log.flush.interval.ms:需要等待多久将消息强制刷新到磁盘,默认Long.MAX_VALUE。让OS决定。
max.message.bytes/message.max.bytes:消息的最大字节数,默认值1000012
rerention.ms/log.retention.ms:使用delete的日志清理策略时消息能够保留多久,默认7天。
segment.bytes/log.segment.bytes:日志分段的最大值,默认1GB。
segment.index.bytes/log.index.size.max.bytes:日志分段索引的最大值,默认10MB。
KafkaAdminClient
有时候希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台。
分区的管理
优先副本的选举
分区使用多副本来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。副本不做任何对外提供服务的操作。
分区重分配
本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本。
复制限流
如果集群中某个主题或某个分区的流量在某段时间内特别大,那么只靠减少粒度是不足以应对的,这时需要一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大影响。
复制限流主要有两种实现方式:kafka-config.sh脚本和kafka-reassign-partitions.sh脚本。
修改副本因子
创建主题后可以修改分区的个数,同样可以修改副本因子(副本数)。
选择合适的分区数
性能测试工具:kafka-producer-perf-test.sh
分区数越多吞吐量越高吗?并不是
分区数上限?大几千
考量因素:吞吐量
分区数会占用文件描述符,而一个进程所能支配的文件描述符是有限的,也就是通常所说的文件句柄的开销。
分区数过多会让Kakfa的启动和关闭耗时变长。
日志存储
文件目录布局
一个分区对应一个日志。为了防止Log过大,引入了日志分段(LogSegment),将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,便于消息的维护和清理。
Log在物理上以文件夹的形式存储,LogSegment对应于磁盘上的一个日志文件和两个索引文件间(可能还有.txnindex的事务索引文件)。
LogSegment包括日志文件、偏移量索引文件、时间戳索引文件、其他文件。
日志格式演变
V0:offset(偏移量)+message size(消息大小)+record (消息本身) = 消息。
V1:多个一个timestamp字段,表示消息的时间戳。
消息压缩:Kafka实现的压缩方式是将多条消息一起进行压缩。compression.type配置压缩算法。
V2:Record Batch,内部包含了一条或多条消息。
日志索引
每个日志文件对应了两个索引文件,偏移量索引文件+时间戳索引文件。
当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
日志分段达到一定的条件需要切分,对应的索引文件也需要切分。
日志分段文件的大小超过了broker参数log.segment.bytes配置的值,默认1GB。
当前日志分段中消息的最大时间戳与系统时间戳差值大于log.roll.ms或log.roll.hours的值。默认log.roll.hours为168,7天。
偏移量文件或时间戳文件的大小达到broker端参数log.index.size.max.bytes配置的值。默认10MB。
追加的消息偏移量与日志分段的偏移量之间的差值大于Integer.MAX_VALUE。
偏移量索引
relativeOffset:相对偏移量,消息相对于BaseOffset的偏移量,索引文件名为baseOffset的值。relativeOffset = offset - baseOffset
position:物理地址,消息在日志分段文件中对一个的物理位置。
时间戳索引
timestamp+relativeOffset:日志分段的最大时间戳+时间戳对应的消息的相对偏移量。
根据时间戳找偏移量 -> 根据偏移量赵物理文件位置 -> 根据物理文件位置处扫描日志分段文件找到消息
日志清理
日志删除:按照一定的保留策略直接删除不符合条件的日志分段。
日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
日志分段的保留策略
基于时间的保留策略:默认保留7天(根据日志分段的最大时间戳计算)。
基于日志大小的保留策略:检测日志大小是否超过设定的阈值来删除日志分段的文件,阈值log.retention.bytes默认-1表示无穷大(所有日志文件的总大小)。log.segment.bytes单个日志分段的大小,默认1GB。
基于日志起始偏移量的保留策略:某日志分段的下一个分段的起始偏移量baseOffset是否小于等于logStartOffset,如果是,则删除此日志分段。
磁盘存储
文件追加方式,并且不允许修改已写入的消息。
页缓存:OS实现的一种磁盘缓存,以此减少对磁盘IO的操作。Kafka的消息先写入页缓存,然后由操作系统执行刷盘任务。
零拷贝:将数据直接从磁盘文件复制到网卡设备中,不需要经由程序之手。减少内核和用户模式之间的上下文切换。依赖于底层sendfile实现。
深入服务端
//TODO
深入客户端
//TODO
可靠性探究
Kafka多副本之间如何进行同步?多副本间的数据一致性如何解决,一致性协议是什么?
如何保证Kafka的可靠性?Kafka中的可靠性和可用性之间的关系又如何?
副本剖析
副本是特定分区的副本。follower副本只负责数据同步。
失效副本:包括功能失效的副本、处理同步失效状态的副本。
原理:当follower副本将leader副本LEO之前的日志全部同步时,认为follower副本已经追赶上leader副本,更新副本lastCaughtUpTimeMs标识。Kafka启动一个副本过期检测的定时任务。
ISR的伸缩
子主题
LEO与HW
follower副本从leader副本拉取到消息,更新自己的LEO。
与此同时更新HW,更新HW:MIN(当前LEO,leader副本传送过来的HW值)。
Leader Epoch的介入
Kafka为什么不支持读写分离?
Kafka
优点:让从节点分担主节点的负载压力。对于数据写压力很大而读压力很小的情况,从节点只能分摊很少的负载压力。
日志同步机制
日志同步要保证数据的一致性、顺序性。
日志同步机制一个基本原则:如果告知客户端已经成功了提交了某条消息,那么即使leader宕机,也要保证新选举出来的leader中能够包含这条消息。
可靠性分析
如何确保Kafka完全可靠?
一般来说越多的副本数越能保证数据的可靠性。
生产者客户端参数acks。acks=-1,等待所有ISR集合写入成功。acks=1,只保证leader写入成功。
消费端:enable.auto.commit参数false,表示手动提交位移。
初识Kafka
多分区、多副本且基于ZooKeeper协调的分布式消息系统
消息系统:消息中间件具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性功能。Kafka提供了消息顺序性保障及回溯消费功能。
存储系统:消息持久化到磁盘,有效地降低了数据丢失的风险。Kafka的消息持久化功能和多副本机制,可以把Kafka作为长期的数据存储系统来使用。
流式处理平台:Kafka提供了一个完整的流式处理类库,如窗口、连接、变换和聚合等操作。
概念
Producer:生产者。Consumer:消费者。Broker:服务代理节点。
主题和分区:逻辑上的一个概念,还可以细分为多个分区。Kafka通过offset保证消息在分区内的顺序性。
多副本机制:增加副本数量可以提升容灾能力。
Consumer使用拉(Pull)模式从服务端拉取消息,并且保证消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。
AR:分区中所有的副本统称AR。
ISR:与leader副本保持一定程度同步的副本(包括leader副本)。
OSR:与leader副本同步滞后过多的副本组成OSR。
AR=ISR+OSR(正常情况下,所有的follwer副本都应该与leader副本保持一定程度的同步,即AR=ISR)
HW:俗称高水位,标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
LEO:标识当前日志文件中下一条待写入消息的offset(当前日志分区中最后一条消息的offset值+1)。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。同步复制极大地影响性能。异步复制会造成数据的丢失。ISR的方式有效地权衡了数据可靠性和性能之间的关系。
生产与消费
Kafka提供了一些测试脚本做一些测试工作。
服务端参数配置
$KAFKA_HOME/config/server.properties
zookeeper.connect:指明broker要连接的Zookeeper集群的服务地址。
listeners:broker监听客户端连接的地址列表,配置格式protocol://hostname:port。Kafka支持的协议有PLAINTEXT、SSL、SASL_SSL,如果未开启安全认证,使用简单的PLAINTEXT即可。PLAINTEXT://198.162.0.2:9092
broker.id:指定Kafka集群中broker的唯一标识,默认值-1。如果没有设置,Kafka会自动生成一个。
log.dir和log.dirs:配置Kafka日志文件存放的根目录。log.dir配置单个根目录,log.dirs配置多个根目录。log.dirs的优先级比log.dir高。默认情况下只配置了log.dir参数,默认值/tmp/kafka-logs
message.max.bytes:指定broker所能接收消息的最大值,默认值约等于976.6KB。
生产者
客户端
必要的参数配置
主题、分区号、消息头部、值、消息的时间戳
以主题为单位进行归类,而Key可以让消息再进行二次归类,同一个Key的消息会被划分到同一个分区中。
有Key的消息支持日志压缩功能。value是消息体,一般不为空,如果为空表示特定消息——墓碑消息。
KafkaProducer的3个配置参数:bootstrap.servers(生产者客户端连接Kafka集群的broker地址清单)、key.serializer和value.serializer(broker端接收的消息必须以字节数据的形式存在)
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。
消息的发送
发送消息的模式:发后即忘(fire-and-forget)、同步(sync)、异步(async,Callback()回调函数)。
同一个分区,如果msg1比msg2之前先发送,那么KafkaProducer就可以保证对应的callback1在callback2之前调用,回调函数的调用也可以保证分区有序。
序列化
序列化:生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka,消费者要使用反序列化器。
分区器
分区器:消息在发往broker的过程,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用才能被真正地发往broker。拦截器一般不是必须的。分区器的作用就是为消息分配分区。
如果key不为null,那么默认的分区器会对key进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),拥有相同的key会被写入同一个分区。
如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
如果key为null,计算得到的分区号是所有分区中的任意一个;如果key不为null,计算得到的分区号为可用分区中的任意一个。
生产者拦截器
生产者拦截器:消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
原理分析
整体架构
主线程:KafkaProducer -> 拦截器 -> 序列化器 -> 分区器 -> 消息累加器(每个分区有一个消息双端队列)
Sender线程:创建Request -> Selector -> 等待Response -> 清理消息累加器
整个生产者客户端由两个线程协调运行,分别为主线程和Sender线程。主线程由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后缓存到消息累加器中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
消息累加器主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足(这时KafkaProducer的send方法要么被阻塞,要么抛异常)。
主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列中,在消息累加器的内部为每个分区都维护了一个双端队列。
元数据的更新
元数据是指Kafka集群的元数据,记录了集群中有哪些主题,有哪些分区,每个分区leader在哪个节点上。
InFlightRequests(发送请求)可以获得leastLoadedNode,即所有Node中负载最小的那一个。未确认的请求越多则认为负载越大。
leastLoadedNode是用于元数据请求、消费者组播协议的交互场景。
重要的生产者参数
acks:指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
ack=1:只要分区的leader副本成功写入消息,它就会收到来自服务端的成功响应。
ack=0:生产者发送完消息不需要等待任何服务端的响应。
ack=-1或ack=all:发送消息后,需要等待ISR中的所有副本都成功写入消息之后才能收到来自服务端的成功响应。
max.request.size:限制生产者客户端能发送的消息的最大值,默认1MB。设计的其他参数联动,比如broker的message.max.bytes (接收消息的最大值) 。
retries:配置生产者重试的次数,默认值0(发生异常不进行任何重试动作),场景:网络抖动、leader副本的选举(内部重试能恢复的异常)。
retry.backoff.ms:设置两次重试之间的时间间隔,默认100,避免无效的频繁重试。最好估算一下可能的异常回复时间(目的重试时间>异常恢复时间)。
max.in.flight.requests.per.connection:在需要保证顺序的场合配置为1,而不是把retries设置为0。
compression.type:指定消息的压缩方式,默认 none(消息不压缩),还可配置为 gzip、snappy、lz4。压缩可以极大地减少网络传输量、降低网络I/O。
connections.max.idle.ms:指定多久之后关闭闲置的连接,默认540s(9min)。
linger.ms:生产者客户端会在ProducerBatch被填满或等待时间超过linner.ms值时发送出去,默认0。
receive.buffer.bytes:设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认32KB。
send.buffer.bytes:设置Socket发送消息缓冲区(SO_RECBUF)的大小,默认128KB。
request.timeout.ms:配置Producer等待请求响应的最长时间,默认30s。
消费者
消费者和消费者组
每一个分区只能被一个消费组中的一个消费者所消费。一个消费者可以订阅多个分区。
partition.assignment.strategy:设置消费者与订阅主体之间的分区分配策略。
Kafka支持两种消息投递模式:点对点模式(基于队列)、发布/订阅模式。
点对点模式:如果所有的消费者都隶属于同一个消费组,那么消息都会被均衡地投递给每一个消费者(一对一),即每条消息只会被一个消费者处理。
发布/订阅模式:如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者(一对多),即每条消息会被所有的消费者处理。
同一个消费组内的消费者即可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
消费步骤
配置消费者客户端参数及创建响应的消费者实例。
订阅主题。
拉取消息并消费。
提交消费位移。
关闭消费者实例。
必要参数
Kafka消费者客户端KafkaConsumer中的4个参数,如下:
bootstrap.servers:指定连接Kafka集群所需的broker地址清单。
group.id:消费者隶属的消费组的名称(一般设置具有业务意义的名称)。
key.deserializer和value.deserializer:与生产者的中的key和value的序列化对应。
订阅主题与分区
一个消费者可以订阅一个或多个主题。
如果消费者采用正则表达式的方式订阅,在之后如果有人创建了新的主题,并且名字和正则表达式相匹配,那么消费者可以消费到新添加的主题中的消息。
消费者还可以直接订阅某些主题的特定分区(KafkaConsumer的assign方法)。
Kafka消费者的订阅状态:集合订阅的方式(subscribe(Collection))、正则表达式订阅的方式(subscribe(Pattern))、指定分区订阅(assign(Collection))的的方式。这三种状态是互斥的。
通过subscribe方法订阅分区是具备消费者自动均衡的,而assign不具备。
反序列化
反序列化器:ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer。
实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的时候,推荐使用Avro、JSON、Thift、ProtoBuf或Protostuff等序列化工具来包装。
消费消息
Kafka中的消息是基于拉模式的。
消息的消费模式:推模式、拉模式。
Kafka消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll方法,而poll方法返回的是所订阅的主题上的一组消息。
ConsumerRecord类属性:主题、分区、偏移量、时间戳、TimestampType(两种类型,消息创建时间、消息存储时间)、Key和value经过序列化后的大小、消息的头部内容headers、CRC32的校验值。
位移提交
偏移量:对于分区而言,每条消息都有唯一的offset,用来表示消息在分区中对应的位置。
消费位移:对于消费者而言,也有一个offset,消费者使用offset表示消费到分区中某个消息所在的消息。
对于一条消息而言,偏移量和消费者消费它的消费位移是相等的。
在旧的消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。
把消费位移存储起来的动作称为 "提交",消费者在消费完消息之后需要执行消费位移的提交。
当前消费者消费到的位置为x,那么消费者需要提交的消费位移并不是x,而是x+1。
消息丢失:当前一次拉取消息集[x+2,x+7],如果拉取消息后就进行位移提交,即x+8,那么当前消费x+5遇到异常,故障恢复后,重新拉区消息是从x+8开始的。
重复消费:位移提交的动作是在消费完所有拉取到的消息之后才执行的。当消费x+5遇到异常,在故障恢复后,又重新拉区的消息是从x+2开始的。x+2到x+4之间的消息又重新消费了一遍。
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象。
Kafka提供手动位移提交的方式,手动的提交方式可以让开发人员根据程序逻辑在合适的地方进行位移提交。
enable.auto.commit:false。
手动提交分为同步提交和异步提交。
同步提交:对拉取到的每一条消息做相应的逻辑处理,或批量处理+批量提交的方式。也有可能出现重复消费问题(消费位移提交前程序突然崩溃)。commitSync()方法会根据poll方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。
异步提交:commitAsync()方法在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。
重复消费:commitAsync提交的时候也会失败。一般能想到的方法是重试。如果某一次提交的消费位移为x,但是提交失败了,然后下一次又异步提交了消费位移为x+y,这次成功了。如果这时重试第一次提交的消费位移x,那么此时的消费位移又变为了x。这样就会引入重复消费问题。
解决方法:设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号对应的值。遇到位移提交失败需要重试的时候,可以检查提交的位移和序号的值的大小,如果前者小于后者,说明有更大的位移已经提交了;如果相等,进行重试提交。
控制或关闭消费
KafkaConsumer提供了对消费速度进行控制的方法,有时我们需要暂停对某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。
KafkaConsumer是非线程安全的,调用wakeup方法后可以退出poll的逻辑。跳出循环后一定要显示地执行关闭动作以释放运行过程中占用的各种系统资源。
指定位移消费
当一个新的消费组建立的时候,或者消费组内的一个新消费者订阅了一个新的主题,或者__consumer_offsets主题中关于这个消费组的位移信息过期而被删除后,没有可以查找的消费位移。
Kafka中消费者找不到记录的消费位移时,会根据消费者客户端参数auto.offset.reset的配置决定从何处开始进行消费,默认值"latest",表示从分区末尾开始消费消息。参数"earliest"表示消费者从0开始消费。参数"none"抛异常(前提是找不到消费位移)。
此时用一个新的消费组来消费某个主题时,客户端会报出重置位移的提示信息。Resetting offset for partition。
seek方法指定某个分区的位移消费
再均衡
指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
再均衡期间,消费组内的消费者是无法读取消息的。
为了防止重复消费问题,一般情况下,应该避免不必要的再均衡发生。当然也可以通过异步提交消费位移的操作避免重复消费。
消费者拦截器
主要在消费到消息或在提交消息位移时进行一些定制化的操作。
KafkaConsumer会在poll方法返回之前调用拦截器的onConsume方法来对消息进行定制化操作,比如修改返回的消息内容、按照某种规则过滤消息。
KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit方法,可以使用这个方法来记录跟踪所提交的位移信息。
某些业务场景中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效。自定义拦截器ConsumerInterceptorTTL实现ConsumerInterceptor<>接口。
多线程实现
KafkaConsumer定义了一个acquire方法,用来检测当前是否只有一个线程在操作,若有其他线程在操作则会抛出ConcurrentModifcationException异常。
acquire通过线程操作计数标记的方式检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()和release()表示加锁和解锁。
多个消费线程同时消费同一个分区,通过assign、seek 方法实现。这种实现对于位移提交和顺序控制的处理会变得非常复杂。实际应用使用的很少。
将处理消息模块改成多线程的实现方式。
重要的消费参数
fetch.min.bytes:配置Consumer在一次拉取请求中能从Kafka中拉取的最小数据量,默认1B。
fetch.max.bytes:配置Consumer在一次拉取请求中从Kafka拉取的最大数据量。默认值50MB。如果此值比Kafka任何一条消息都小,也是可以消费的。
fetch.max.wait.ms:指定Kafka的等待时间。
max.partition.fetch.bytes:配置从每个分区返回给Consumer的最大数据量,默认值1MB。
max.poll.records:Consumer在一次拉取请求中拉取的最大消息数,默认值500条。
connection.max.idle.ms:在多久之后关闭闲置的连接,默认值9分钟。
exclude.internal.topics:Kafka的两个内部主题,__consumer_offsets和__transaction_state。exclude~指定Kafka中的内部主题是否可以向消费者公开,默认true。
receive.buffer.bytes:设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认64KB。-1表示使用系统的默认值。
send.buffer.bytes:设置发送消息缓冲区(SO_SNDBUF)的大小,默认值128KB。
request.timeout.ms:Consumer等待请求响应的最长时间,默认30s。
metadata.max.age.ms:配置元数据的过期时间,默认值5分钟。
reconnect.backoff.ms:配置尝试重新连接指定主机之前的等待时间,避免频繁地连接主机,默认50ms。
retry.backoff.ms:配置尝试重新发送失败的请求到指定的主题分区之前的等待时间,避免在某些故障情况下频繁地重复发送,默认100ms。
isolation.level:配置消费者的事务隔离级别。默认 "read_uncommitted",可以消费到HW处的位置。"read_committed",可以消费到LSO处的位置。
0 条评论
下一页