spring cloud stream 3.1.2
2021-05-25 19:30:25 9 举报
源码流程图
作者其他创作
大纲/内容
handlers.add(handler)
dispatcher.addHandler
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue
rocketmq
发送到本地注册的消费者
functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);注册BindableFunctionProxyFactory的代理工厂类
MessageDispatcher
inputChannel.subscribe(messageHandler);
设置了\"spring.cloud.stream.source\"用于没有注册font color=\"#ff0000\
bindProducer&return messageChannel
Function
this.register(functionRegistration);注册function
Rockermq
接受到信息
createAndBindInputs/createAndBindOutputs
调用
this.functionCatalog.lookup(functionDefinition)找出function
传入
注册Bean定义
afterPropertiesSet
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue设置值
channel.send
this.getRequiredDispatcher().dispatch(message)
发送
找不到function
StreamFunctionProperties
消费者为例子
org.springframework.cloud.stream.binder.AbstractMessageChannelBinder#doBindConsumer
initPushConsumer
收到信息
messageChannel.send
handlers
注册监听
IntegrationReactiveUtils.messageChannelToFlux(inputChannel)
org.springframework.cloud.stream.function.FunctionConfiguration#functionInitializer
addInterceptors & return messageChannel
是Supplier
function
getBinder拿到外部的绑定
return
onInit
getBindingTargetFactory
discoverFunctionInBeanFactory(在bean中查找function)
sendMessage
SubscribableChannel
this.context.registerBean
bindFunctionToDestinations
createInput
this.messagingTemplate.send
MessageChannel
this.messagingTemplate.setBeanFactory(beanFactory);
create
org.springframework.cloud.stream.messaging.DirectWithAttributesChannel#subscribe
resolveDestination
属性注入
doSend
createOutput
rocketmq监听
不是Supplier
createInput/createOutput
初始化
doStartWithBindable
messageChannel不存在
判断function类型
org.springframework.cloud.stream.binding.AbstractBindingLifecycle#start
discoverFunctionType(查找function类型)
RocketMQInboundChannelAdapter
return consumerEndpoint
adaptSubscribableChannelToPublisher
重新查找function
org.springframework.cloud.stream.function.FunctionConfiguration#functionBindingRegistrar
rocketmq发送信息
AbstractBindableProxyFactory
this.functionCatalog.lookup
找到返回function
doLookup
consumeMessage
((InitializingBean)consumerEndpoint).afterPropertiesSet();
org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry#lookup
createConsumerEndpoint
this.applicationContext.getBeansOfType(BindableProxyFactory.class)找出BindableProxyFactory的Bean
注册
实际调用
messageChannel存在
bindingService.bindConsumer
binder.doBindConsumer
绑定
结束
收藏
0 条评论
下一页