spring cloud stream 原理解析
2022-08-03 09:26:35 0 举报
AI智能生成
spring cloud stream 原理解析,关注我,后续有更多的源码解析
作者其他创作
大纲/内容
自动配置类
ChannelBindingAutoConfiguration
1. 创建 PollerMetadata 的bean
BindersHealthIndicatorAutoConfiguration
ChannelsEndpointAutoConfiguration
BindingsEndpointAutoConfiguration
BindingServiceConfiguration
1. globalErrorChannelCustomizer
2. StreamListenerAnnotationBeanPostProcessor
3. BinderFactory
1. BinderTypeRegistry
2. BindingServiceProperties
4. BindingService
5. OutputBindingLifecycle
6. InputBindingLifecycle
7. BinderAwareChannelResolver
8. BinderAwareRouter
9.
FunctionConfiguration
1. @import BindingBeansRegistrar
2. @import BinderFactoryAutoConfiguration
1. 创建 MessageHandlerMethodFactory 的 bean
2. 创建 BinderTypeRegistry 的 bean
3. 创建 MessageConverterConfigurer 的 bean
4. 创建 SubscribableChannelBindingTargetFactory 的 bean
5. 创建 MessageSourceBindingTargetFactory 的 bean
6. 创建 CompositeMessageChannelConfigurer 的 bean
3. 创建 StreamBridge 的 bean
4. 创建 functionBindingRegistrar 的 bean
1. afterPropertiesSet
1. 如果启用 @EnableBinding,就不会激活 functionStream 配置
2. determineFunctionName()
3. filterEligibleFunctionDefinitions
4. FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition);
5. 注册 BindableFunctionProxyFactory 的 beanDefinition
5. 创建 PollableSourceRegistrar 的 bean
6. 创建 functionInitializer 的 bean
1. afterPropertiesSet
1. 遍历 BindableProxyFactory 的bean
2. this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition)
1. this.assertBindingIsPossible(bindableProxyFactory)
2. this.functionProperties.isComposeFrom() 为 true, 表示是 组合函数
3. isReactiveOrMultipleInputOutput() 对 reactive 的支持
4. createFunctionHandler() 创建函数处理器
1. 创建ServiceActivatingHandler对象
1. 重要方法 handleMessage
2. 重要方法 handleMessageInternal
3. 重要方法 handleRequestMessage
4. 重要变量 private final MessageProcessor<?> processor;
2. ServiceActivatingHandler 重写了 sendOutputs() 方法
1. 根据 message 中的 headers 中变量来动态设置 destination
2. 走默认的方法 super.sendOutputs(result, requestMessage)
5. handler.setOutputChannelName(outputDestinationName);
6. ((SubscribableChannel) inputDestination).subscribe(handler)
RoutingFunctionEnvironmentPostProcessor
概念
binder
binding
function
Binder
核心类
1. UnicastingDispatcher
3. BindingService
2. AbstractMessageChannelBinder
4. ServiceActivatorAnnotationPostProcessor
rocketmq 核心类
2. RocketMQInboundChannelAdapter
重要类 RocketMQListenerBindingContainer
1. createConsumerEndpoint() 调用时,设置了 errorChannel 的机制
1. 重要方法 RocketMQInboundChannelAdapter.BindingRocketMQListener#onMessage
2. 重要方法 RocketMQInboundChannelAdapter.this.sendMessage(message)
1. getRequiredOutputChannel()
3. RocketMQMessageHandler
0. messageHandler.setSendFailureChannel(errorChannel); 设置了 errorChannel
1. 实现了 messageHandler, 关注 handleMessageInternal() 方法
1. 分为 sync 和 async 方式发送
2. sync 方式检查 sendStatus, 如果不是ok,errorChannel#send 来处理
2. async 方式,用 sendCallback 回调方式来处理
Kafka 核心类
1. KafkaMessageDrivenChannelAdapter
2. ProducerConfigurationMessageHandler
示例代码
BindableFunctionProxyFactory#afterPropertiesSet
1. createInput()
PollableMessageSource
SubscribableChannel
SubscribableChannelBindingTargetFactory
2. createOutput()
0 条评论
下一页