消息系统
2020-07-21 09:52:36 0 举报
消息系统
作者其他创作
大纲/内容
AbstractMessageChannelBinder.doBindConsumer()
restTemplate回写消息
消费服务方法
消息发送
Redis
KafkaTopicProvisioner.provisionConsumerDestination()
创建ConsumerDestination
ListenerConsumer.run()
动态创建输入通道、输出通道
死循环接收消息
业务代码中可以根据自己需求调用
缓存维护消息中间件连接信息、消息事件信息
Rabbit
初始化KafkaadminClientProperties参数
扫描所有添加@StreamListener注解方法缓存到Map
调用KafkaTemplate发送消息
消息
对应通道消息
获取特定绑定器
记录消息发送情况
KafkaMessageChannelBinder.createConsumerEndpoint()
消费端查找控制器
消费者
消息处理
接收消息通道
StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated()
BindableProxyFactory.createAndBindInputs()绑定接收通道并开启接收
生产者
将@StreamListener修饰的方法与对应的订阅通道绑定
KafkaBinderConfiguration.kafkaMessageChannelBinder()
动态更新消息中间件连接信息、动态更新本服务消息事件信息
绑定生产者
SubscribableChannel.subscribe()
维护基础数据信息
初始化配置参数KafkaMessageChannelBinder
StreamListenerAnnotationBeanPostProcessor.postProcessAfterInitialization()
服务启动时Spring 初始化
动态创建KafkaProducerMessageHandler
KafkaProducerMessageHandler.handleRequestMessage()
事件信息数据
StreamListener
反射调用消费端业务逻辑
消费端口
发送指定topic消息,到对应消息中间件
记录消费情况
KafkaMessageListenerContainer.doStart()
AbstractBindingTargetFactory.createOutput()
Mysql
Kafka
消费对应消息中间件topic消息
初始化获取yml文件配置参数
通道绑定具体发送消息
查找能处理消息的Bean
KafkaMessageChannelBinder.createConsumerEndpoint()
记录消息中间件连接信息、消息事件信息
KafkaBinderConfiguration.configurationProperties()
创建KafkaMessageDrivenChannelAdapter
DispatchingStreamListenerMessageHandler.handleRequestMessage
发送消息到对应中间件
动态更新消息中间件连接信息
动态注册BeanDefinition
DispatchingStreamListenerMessageHandler.handleRequestMessage()
发送消息控制器
AbstractMessageChannelBinder.doBindProducer()
创建MessageProducer
获取对应消息消费端点
Rocket
restTemplate回写消息发送情况
BindableProxyFactory.bindOutputs()绑定发送通道,并初始化发送配置
消息接收
事件服务端
KafkaMessageChannelBinder.createProducerMessageHandler()
StreamListenerMessageHandler.handleRequestMessage()
EventSender.fireEvent()
KafkaBinderConfiguration.provisioningProvider()
绑定服务
0 条评论
下一页