kakfa 版本请求
2022-05-08 12:13:26 0 举报
kafka消息发送前,版本请求
作者其他创作
大纲/内容
2
byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
buffer
开始
NodeApiVersions
遍历
min_version
ApiKeys 是否存在
0-1
比较:correlationId
api_key
RequestHeader header
ApiKeys apiKey
this.connectionStates.ready(node);
存在
produceRequestVersion
MAGIC_VALUE_V1
Struct responseBody
MAGIC_VALUE_V0
ResponseHeader.parse(responseBuffer)
不存在
NetworkReceive
this.nodeApiVersions.values()
new ApiVersionsResponse(struct)
new IllegalStateException(\"Correlation id for response (
所有node节点都支持的最高版本
inFlightRequests.completeNext(source);
InFlightRequest
public static final Schema SCHEMA = new Schema( new Field(\"correlation_id\
max_version
SCHEMA.read(buffer)
List<ApiVersion> unknownApis
NetworkClient.ApiVersions apiVersions
String source = receive.source();nodeId
不相等
结束
ApiVersion
MAGIC_VALUE_V2
0 条评论
下一页