Spring-kafka启动流程
2020-04-17 09:57:41 1 举报
spring-kafka-KafkaListener的只执行流程
作者其他创作
大纲/内容
KafkaMessageListenerContainer
KafkaListenerEndpointRegistry#startIfNecessary
KafkaBootstrapConfiguration
KafkaMessageListenerContainer#doStart
KafkaListenerEndpointRegistrar#setEndpointRegistry
标注了KafkaListener的方法来处理
ConsumerSeekAware#registerSeekCallback
DefaultKafkaConsumerFactory#createKafkaConsumer
KafkaListenerEndpoint#setupListenerContainer
调用KafkaListener标记的方法
KafkaListenerEndpointRegistry#start
startImmediately
MethodKafkaListenerEndpoint#createMessageListenerInstance
postProcessAfterInitialization
kafkaListener的持有者
SubscriptionState
KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated
ListenerConsumer#invokeListener
ConcurrentMessageListenerContainer
run()
KafkaListenerAnnotationBeanPostProcessor
根据concurrency 并发数创建对应的消息容器个数
ConcurrentMessageListenerContainer#doStart
ListenerConsumer#processSeeks
DefaultKafkaConsumerFactory#createConsumer
true
MessagingMessageListenerAdapter#setHandlerMethod
KafkaMessageListenerContainer.ListenerConsumer
ConcurrentKafkaListenerContainerFactory#createContainerInstance
找到KafkaListener
ConsumerCoordinator
EnableKafka
AbstractKafkaListenerEndpoint#setupListenerContainer
AbstractKafkaListenerContainerFactory#createListenerContainer
KafkaListenerEndpointDescriptor
先把拉取出来的需要重新处理的offset放入consuer对应的消费队列中,然后再重新来取出来,
AbstractMessageListenerContainer#start
默认为false
KafkaListenerEndpointRegistrar#registerAllEndpoints
MethodKafkaListenerEndpoint#configureListenerAdapter
KafkaListenerEndpointRegistrar#afterPropertiesSet
initializeContainer
KafkaListenerEndpointRegistry
KafkaConsumer
MethodKafkaListenerEndpoint#createMessageListener
调用后置处理器
endpointDescriptors
listenerContainers
在消费的时候可以通过这个接口重新把这条消息方法consumer拉取的消息队列中,可以再次消费
kafka消费者
KafkaListenerEndpointRegistry#createListenerContainer
ListenerConsumer#pollAndInvoke
AbstractMessageListenerContainer#doStart
KafkaListenerAnnotationBeanPostProcessor#processKafkaListener
AbstractKafkaListenerEndpoint#setupMessageListener
设置concurrency 并发数
KafkaListenerAnnotationBeanPostProcessor#processListener
收藏
收藏
0 条评论
回复 删除
下一页