spring cloud stream 原理解析
2022-08-03 09:26:35 0 举报
AI智能生成
spring cloud stream 原理解析,关注我,后续有更多的源码解析
作者其他创作
大纲/内容
1. 创建 PollerMetadata 的bean
ChannelBindingAutoConfiguration
BindersHealthIndicatorAutoConfiguration
ChannelsEndpointAutoConfiguration
BindingsEndpointAutoConfiguration
1. globalErrorChannelCustomizer
2. StreamListenerAnnotationBeanPostProcessor
1. BinderTypeRegistry
2. BindingServiceProperties
3. BinderFactory
4. BindingService
5. OutputBindingLifecycle
6. InputBindingLifecycle
7. BinderAwareChannelResolver
8. BinderAwareRouter
9.
BindingServiceConfiguration
1. @import BindingBeansRegistrar
1. 创建 MessageHandlerMethodFactory 的 bean
2. 创建 BinderTypeRegistry 的 bean
3. 创建 MessageConverterConfigurer 的 bean
4. 创建 SubscribableChannelBindingTargetFactory 的 bean
5. 创建 MessageSourceBindingTargetFactory 的 bean
6. 创建 CompositeMessageChannelConfigurer 的 bean
2. @import BinderFactoryAutoConfiguration
3. 创建 StreamBridge 的 bean
1. 如果启用 @EnableBinding,就不会激活 functionStream 配置
2. determineFunctionName()
3. filterEligibleFunctionDefinitions
4. FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition);
5. 注册 BindableFunctionProxyFactory 的 beanDefinition
1. afterPropertiesSet
4. 创建 functionBindingRegistrar 的 bean
5. 创建 PollableSourceRegistrar 的 bean
1. 遍历 BindableProxyFactory 的bean
1. this.assertBindingIsPossible(bindableProxyFactory)
2. this.functionProperties.isComposeFrom() 为 true, 表示是 组合函数
3. isReactiveOrMultipleInputOutput() 对 reactive 的支持
1. 重要方法 handleMessage
2. 重要方法 handleMessageInternal
3. 重要方法 handleRequestMessage
4. 重要变量 private final MessageProcessor<?> processor;
1. 创建ServiceActivatingHandler对象
1. 根据 message 中的 headers 中变量来动态设置 destination
font color=\"#ff0000\
2. ServiceActivatingHandler 重写了 sendOutputs() 方法
4. createFunctionHandler() 创建函数处理器
5. handler.setOutputChannelName(outputDestinationName);
6. ((SubscribableChannel) inputDestination).subscribe(handler)
6. 创建 functionInitializer 的 bean
FunctionConfiguration
RoutingFunctionEnvironmentPostProcessor
自动配置类
binder
binding
function
概念
1. UnicastingDispatcher
3. BindingService
2. AbstractMessageChannelBinder
4. ServiceActivatorAnnotationPostProcessor
核心类
重要类 RocketMQListenerBindingContainer
1. createConsumerEndpoint() 调用时,设置了 errorChannel 的机制
1. 重要方法 RocketMQInboundChannelAdapter.BindingRocketMQListener#onMessage
1. getRequiredOutputChannel()
2. 重要方法 RocketMQInboundChannelAdapter.this.sendMessage(message)
2. RocketMQInboundChannelAdapter
0. messageHandler.setSendFailureChannel(errorChannel); 设置了 errorChannel
1. 分为 sync 和 async 方式发送
2. async 方式,用 sendCallback 回调方式来处理
3. RocketMQMessageHandler
rocketmq 核心类
1. KafkaMessageDrivenChannelAdapter
2. ProducerConfigurationMessageHandler
Kafka 核心类
Binder
github地址
示例代码
PollableMessageSource
SubscribableChannelBindingTargetFactory
SubscribableChannel
1. createInput()
2. createOutput()
BindableFunctionProxyFactory#afterPropertiesSet
spring cloud stream 原理解析
0 条评论
回复 删除
下一页