2021.09-Dubbo启动源码分析2.6.x
2021-09-06 17:03:02 0 举报
Dubbo启动源码分析,版本基于2.6.10
作者其他创作
大纲/内容
是否加载默认直连模式,一般用于调试情况1.环境变量dubbo.resolve.file2.${user.home}/dubbo-resolve.properties
1.XML配置模式
ZookeeperClient
findConfigedHosts
toCategoriesPath(url)
true
dubbo
scanner.addIncludeFilter
ServiceBean(服务提供者)
ExtensionLoader.getExtensionLoader(Protocol.class)
invokers.size() == 2
JdkProxyFactory
configureBean(bean)
createPersistent
notify()
invoker.invoke()
实际调用StubProxyFactoryWrapper类
DubboNamespaceHandler
setApplicationContext
get
b=isInit()
clazz=Class.forName()
DefaultFuture
Invoker
registry.register(registedProviderUrl)
ZookeeperRegistrydoSubscribe()
get()
ephemeral==true
false
DubboConfigConfigurationSelector
new HeaderExchangeServer()
directory.list()
ProviderConsumerRegTable.registerProvider
ReferenceMethodElement.inject()
new ZkclientZookeeperClient(url)
clazz.newInstance()
返回instance
zkClient.addStateListener()
从本地map中找到对应key的class对象,通过反射创建该class对象的实例对象并返回
ServiceAnnotationBeanPostProcessor.class
返回Exporter
dubbo://172.19.26.72:20880/***Service?anyhost=true&application=***&bind.ip=172.19.26.72&bind.port=20880&class=***ServiceImpl&dubbo=2.6.0&generic=false&interface=***Service&methods=***&owner=**&side=provider×tamp=***
initClient
registry=com.alibaba.dubbo.registry.integration.RegistryProtocoldubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocolfilter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper举例:如果传入是dubbo, 最终返回对象是ProtocolFilterWrapper,顺序:filter->listener-dubbo 如果传入的registry,最终返回对象是ProtocolListenerWrapper,顺序:listener->filter-registry
referenceBean = beanBuilder.build()
ReferenceAnnotationBeanPostProcessor
super
postProcessBeanFactory
NettyClient
3.组合各种参数Map
Dubbo里只有ExtensionFactory接口使用了,其有一个子类AdaptiveExtensionFactory
listener.notify(categoryList)
configurator.configure
Class<?> clazz =map.get(name);
获取所有注册中心地址
DubboInvoker
registerReferenceAnnotationBeanPostProcessor
返回代理对象
clazz.isAnnotationPresent(Adaptive.class)
.create()
如果协议不是injvm
new ReferenceBeanBuilder()
new DecodeHandler()
findParserForElement
new RegistryDirectory<T>
ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(extName);
m.isAnnotationPresent(Adaptive.class)
toMethodInvokers()
exportLocal
FactoryBean
循环所有注册中心URL
super.getServiceClass(ref)
module->ModuleConfig
成员变量
对所有包含注解的方法拼接类代码
返回编译后的代理class
cachedActivates.put()
AbstractRegistry
Exporter<?> exporter=protocol.export(wrapperInvoker)
Result.recreate()
cluster.join(font color=\"#0000ff\
ProtocolFilterWrapper
生成
所有的文件只会被加载一次,keyvalue都会被缓存起来META-INF/dubbo/META-INF/services/META-INF/dubbo/internal/
getExtensionLoader()
追加所有的参数到map
return invoker.invoke(invocation)
service->ServiceBean
循环所有的Holder
GenericService.class
new ExporterChangeableWrapper<T>
如果已经初始化,直接返回
new ZookeeperRegistry()
JavassistProxyFactory
DubboBeanDefinitionParser
getExtension(name)
onApplicationEvent
doRegister(URL url)
2.ProtocolFilterWrapper
parse
super.doNNotify
循环所有的元素injectedElements
ReferenceConfig
源码: private static <T> Invoker<T> buildInvokerChain (final Invoker<T> invokerspan style=\"font-size: inherit;\
application->ApplicationConfig
FailoverClusterInvoker
ServiceAnnotationBeanPostProcessor
pushServiceClass()
ref =createProxy()
AbstractClient
3.RegistryProtocol
cachedAdaptiveClass
ZookeeperRegistryFactory
\"registry\".equals(url.getProtocol())
serviceConfig.setApplicationContext()serviceConfig.setRegistries()serviceConfig.setProvider()serviceConfig.setMonitor()serviceConfig.setApplication()serviceConfig.setModule()serviceConfig.setProvider()serviceConfig.setProtocols()serviceConfig.setTag()
cache
循环
RootBeanDefinition
Object referenceBean=buildReferenceBean()
ProtocolUtils.isGeneric
ServiceConfig
Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
1.ProtocolListenerWrapper
举例此处很重要理解SPI的精髓,必知!
publishExportEvent();
ProtocolFilterWrapper.Invoker()
生成包装类
ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
zkClient.create()
value.length()=0||==\"false\"
buildReferenceMetadata()
new ListenerExporterWrapper()
buildServiceBeanDefinition()
getChannel()
通过字符串拼接出一个java 类文本,然后通过动态编译以及类加载,将该类文件对应的class载入到内存,该类是一个实现了扩展接口的java类
protocol.export(invokerDelegete)
compiler.compile
getServiceClass()
循环所有协议准备暴露
Server
ChildListener
循环所有的路径
doRegister()
loader.getExtension(name)name包含spi和spring
查询所有的方法,如果被Reference注解
RegistryProtocol
创建ListenableFutureTask,任务内部创建new ZkClient()
获取所有的服务提供者
createAdaptiveExtension()
createExtension
都执行
AbstractZookeeperClient
new ReferenceBean<Object>(annotation)
通过group/interfaceName/version生成key
loadFile
listener.exported(this)
registerServiceAnnotationBeanPostProcessor
new AnnotationTypeFilter(Service.class)
registryCacheExecutor.execute(new SaveProperties(version))
doSubscribe()
getTransporter()
createEphemeral
尝试链接connect
protocol.refer()
.newInstance()
getSharedClient
cachedAdaptiveClass != null
=registry
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException(\"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!\"); } public int getDefaultPort() { throw new UnsupportedOperationException(\"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!\"); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException(\"com.alibaba.dubbo.rpc.Invoker argument == null\"); if (arg0.getUrl() == null) throw new IllegalArgumentException(\"com.alibaba.dubbo.rpc.Invoker argument getUrl() == null\"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? \"dubbo\" : url.getProtocol()); if (extName == null) throw new IllegalStateException(\"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(\" + url.toString() + \") use keys([protocol])\"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)font color=\"#0000ff\" style=\"\
doNotify()
url=new URL();生成injvm协议,并设置协议和host
getClients(url)
注册到Spring容器中
预设参数(环境变量中获取)addPropertyReference()
添加到wrappers的Set中
registry.subscribe()
findFieldReferenceMetadata
查找本机的ip
javassist
doselect()
font color=\"#b71c1c\
new URL()
ZookeeperRegistry
clazz.getAnnotation(Activate.class)
new ExtensionLoader<T>
new URL
FailbackRegistry
Exchangers.bind
export()
postProcessAfterInitialization
needsRefresh()
SPI源码
Exchangers.connect()
put
registry
super-AbstractRegistry
AnnotationBean(注解扫码及服务提供和消费处理)
ReferenceCountExchangeClient.request()
1
init();
toInvokers(invokerUrls);
FutureFilter.invoke()
consumer://10.37.129.2/xx?applicatioxx&metxhods=x&owner=xx&side=consumer×tamp=xx
refprotocol.refer()
GenericService.class != invoker.getInterface()
saveProperties()
List
connect();
ResponseFuture
new ExchangeClient[connections]
ServiceBean,ReferenceBean,AnnotationBean比较特殊,但却非常重要ServiceBean实现了InitializingBean,ApplicationListener,BeanNameAwareReferenceBean实现了InitializingBean,FactoryBeanfont color=\"#f44336\" style=\"\
package com.alibaba.dubbo.rpc.cluster;import com.alibaba.dubbo.common.extension.ExtensionLoader;public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException(\"com.alibaba.dubbo.rpc.cluster.Directory argument == null\"); if (arg0.getUrl() == null) throw new IllegalArgumentException(\"com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null\"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter(\"cluster\
result
new StaticDirectory()
super ReferenceConfig
providers
通过SPI获取Factory
ListenerInvokerWrapper.invoke()
buildInvokerChain()
InvokerInvocationHandler
ReferenceBean(服务消费者)
isWrapperClass()
for循环
Transporter= NettyTransporter
doExportUrls()
getObject()
ProviderConsumerRegTable.registerConsuemr()
nvokers.size() == 1
value.startsWith(\"force\")
AbstractAnnotationConfigBeanBuilder
injectExtension((T) getAdaptiveExtensionClass().newInstance())
B bean = doBuild()
client.start()
其他省略,实例化
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
createRegistry()
是否存在包装类
refreshInvoker(invokerUrls)
registerWithGeneratedName()
如果类有Service.class注解
destroyUnusedInvokers()
MonitorFilter.invoke()
configureMonitorConfig(bean)
Spring
new ServiceBean<Object>(service)
toConfigurators
registerProvider
properties.store()
initConnectStatusCheckCommand
Transporters.connect()
doRefer()
HeaderExchangeChannel.request()
包括暴露的inteface,method,拥有人等等
循环调用
/*/dubbo-registry-*10.11.32.251:2181.cache.lock
invokers != null && invokers.size() > 0
2
InvokerWrapper
将被注解的类包装为RootBeanDefinition rootBeanDefinition(ServiceBean.class)
查询所有的成员变量,如果被Reference注解
url = url.setProtocol()重要:将registry协议更改为zookerper
registerServiceBean()
加载所有的META-INFO/spring.handlers文件加载,读取并实例化NamespaceHandler,并调用init和parse方法
configureModuleConfig(bean)
是否是包装类
doInvoke()
获取类的所有set方法,以及字段,并且方法和字段都以Reference.class注解
(T)proxyFactory.getProxy(invoker)
循环所有的URL
设置为已经暴露exported = true
JavassistCompiler
findServiceBeanDefinitionHolders()
AbstractClusterInvoker
openServer(url)
doOpen()
DubboBeanDefinitionParser.parse
StateListener
ExtensionLoader.getExtensionLoader(ExtensionFactory.class)
如果不包含Adaptive注解抛异常
getExtensionClasses()
protocol.export()
getExtensionClasses()
最终
生成三个文件夹dubbo/*Api/providersdubbo/*Api/configuratorsdubbo/*Api/routers
!local
ReferenceFieldElement.inject()
new ProviderInvokerWrapper()
bean.afterPropertiesSet()
router.route()
MockClusterInvoker
最终调用ZkClient原生操作client.createEphemeral(path)
new FailoverClusterInvoker<T>(directory)
injectExtension
loadRegistries()
返回code
doConnect()
getPackagesToScan
optimizeSerialization(url)
selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0)
new InjvmExporter
3.InjvmProtocol
Class.forName(interfaceName)
zookeeper://zk01.soa.internal.xxx.com:2181/com.alibaba.dubbo.registry.RegistryService?application=xxx&default=true&dubbo=2.6.0&interface=com.alibaba.dubbo.registry.RegistryService&owner=xx×tamp=1630993953523
对所有的查找到的BeanDefintion生成对应的Holder
b = getConsumer().isInit()
invoke
返回AbstractProxyInvoker
RegistryDirectory
b != null && b.booleanValue()
optimizeSerialization
BeanFactoryPostProcessor
new Request()
registerBeanDefinitionParser
调用ServiceBean的export
init()
ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type)
client = new ZkClientWrapper(); client.addListener(new IZkStateListener()
doLocalExport
afterPropertiesSet()
url.getProtocol()
new AbstractProxyInvoker<T>
list()
doRegister(url)
4.DubboProtocol
service_share_connect
调用Netty的启动方法
ConfiguratorFactory
this.cluster.join(directory)
findConfigedPorts
lazy=true
super()
appendParameters
HeaderExchangeClient.request()
查找配置的端口号默认20880
addApplicationListener
new LazyConnectExchangeClient()
serviceConfig.export();
!remote
annotation->AnnotationBean
new InvokerDelegete<T>
syncSaveFile=true
从本缓存methodInvokerMap中获取
doSaveProperties
configureRegistryConfigs(bean)
2.6.5版本以上
zookeeper://zk01.soa.internal.x.x.com:2181/com.alibaba.dubbo.registry.RegistryService?application=ruyi&dubbo=2.6.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=2952×tamp=1631083887128
调用过程
bindIpbindPortbindAddress
getAdaptiveExtensionClass()
文件操作省略
new HeaderExchangeClient()
bootstrap.connect()
LoadBalance loadbalance = ExtensionLoader .getExtensionLoader(LoadBalance.class) .getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName()span style=\"font-size: inherit;\
serviceKey(url)
6.register
resolvePackagesToScan()
LoadBalance loadbalance= ExtensionLoader .getExtensionLoader(LoadBalance.class) .getExtension(Constants.DEFAULT_LOADBALANCE);
new ChannelHandlerDispatcher(handlers)
zkClient=zookeeperTransporter.connect(url)
返回最后一个包装类instance
URL monitorUrl = loadMonitor(registryURL)
reference->ReferenceBean
Dubbo提供了一个注解@Adaptive,该注解可以用于接口的子类上,也可以用于接口方法上。如果用在接口的子类上,则表示Adaptive机制的实现会按照该子类的方式进行自定义实现;如果用在方法上,则表示Dubbo会为该接口自动生成一个子类,并且按照一定的格式重写该方法,而其余没有标注@Adaptive注解的方法将会默认抛出异常
NioClientSocketChannel
4.RegistryProtocol
返回ZookeeperRegistry
doSelect()
StubProxyFactoryWrapper
cluster.join()
如果还没暴露
getAdaptiveExtension()
new Holder<Object>()
doList()
registry->RegistryConfig
URL替换
routers
doExportUrlsFor1Protocol
type.getAnnotation(SPI.class)
NettyChannel
loadExtensionClasses()
initialized=true
registryFactory.getRegistry(url)
setRouters
放入定时job然后retry()
new InvokerDelegate()
injectExtension()
缓存cachedInstances
AbstractDirectory
scanner.doScan(packageToScan)
toMergeMethodInvokerMap()
其他标签对象判断
metadata=findReferenceMetadata()
NettyTransporter.bind
postProcessBeforeInitialization
ReferenceBean.get()
com.alibaba.dubbo.common.bytecode.Proxy return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
创建线程new Thread(),并将ListenableFutureTask放入线程中,然后调用线程的start方法,
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref))
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()
循环所有Client
toUrlPath(url)
new MockClusterInvoker
directory=RegistryDirectory
createServer(url)
返回cachedAdaptiveClass
/dubbo/xxxTradeQueryApi/consumers/consumer%3A%2F%2F10.37.129.2%2Fcom.*.*Api%3Fapplication%3D**%26category%3Dconsumers%26check%3Dfalse%26default.check%3Dfalse%26default.retries%3D0%26default.timeout%3D5000%26dubbo%3D2.6.0%26interface%3D*.*.*Api%26methods%3D*owner%3Dunknown%26pid%3D12769%26revision%3D3.0.5-20200922.080747-4%26side%3Dconsumer%26timestamp%3D1630583697415
new ListenerInvokerWrapper<T>
reader.readLine()
monitor->MonitorConfig
ReferenceAnnotationBeanPostProcessor.class
injvm
@DubboComponentScan
new Exporter<T>()
buildInvokerChain
默认Filter
send()
Compiler
Result
provider->ProviderConfig
5.DubboProtocol
MockInvokersSelector
对失败的URL重试
MockClusterWrapper
new DubboExporter()
postProcessPropertyValues
configureApplicationConfig(bean)
loadProperties()
AdaptiveExtensionFactory
AopUtils.getTargetClass(ref)
Object value=refer()
ZookeeperClient zkClient
isJvmRefer
循环文件夹
ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())
proxyFactory
invokers.get(0)
wrapperClasses.size() 0
ResponseFuture.get()
实例化Parser,每个标签都需要实例化应DubboBeanDefinitionParser
stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper(包装类)jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactoryjavassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory举例:如果传入是jdk, 最终返回对象是StubProxyFactoryWrapper,顺序:stub->jdk 如果传入的javassist,最终返回对象是StubProxyFactoryWrapper,顺序:stub->javassist
classLoader.getResources()
默认是随机选择一个服务,默认由子类RandomLoadBalance实现
cluster.join(directory)
@EnableDubboConfig
.hasExtension(url.getProtocol())
new DubboInvoker<T>
AopUtils.isAopProxy(ref)
scanner.findCandidateComponents
connect
new DubboClassPathBeanDefinitionScanner()
directory.subscribe()
返回map,extensionClasses
doExport()
referenceConfig.afterPropertiesSet()
registry.register()
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension()
查找所有listeners
AdaptiveCompiler
registerServiceBeans()
ConsumerContextFilter.invoke()
registryFactory.getRegistry(registryUrl)
\"registry\"..equals(url.getProtocol())
loadRegistries
clazz.getConstructor(type) 查看是否存在是否有此类型的构造函数,如果存在说明是包装类,在高版本中此判断被单独抽出来,老版本是混在一起的(weimob)
loader.getSupportedExtensions()
registry://zk01.soa.internal.xxx.com:2181/com.alibaba.dubbo.registry.RegistryService?application=xx&default=true&dubbo=2.6.0&owner=xx&istry=zookeeper×tamp=1630994064452
channel.write()
ExtensionLoader.getExtensionLoader(Transporter.class)
ZkclientZookeeperTransporter
DubboComponentScanRegistrar
直接赋值给成员变量cachedAdaptiveClass = clazz;
font color=\"#0000ff\
zkClient.addChildListener()
获取所有方法type.getMethods()
postProcessBeanDefinitionRegistry
实际返回的是AdaptiveExtensionFactory比较特殊
ExtensionLoader.getExtensionLoader(DataStore.class)
构造
select()
new ReferenceBean<Object>(reference)
merge override parameters
referenceConfigs
根据标签-查找标签属性并设置到对象中
referenceConfig.setApplicationContext()referenceConfig.setRegistries()referenceConfig.setProvider()referenceConfig.setMonitor()referenceConfig.setApplication()referenceConfig.setModule()referenceConfig.setProvider()referenceConfig.setProtocols()
createAdaptiveExtensionClass()
=dubbo
获取value
调用Netty Client方法
serviceConfig.afterPropertiesSet()
Transporters.bind
Invoker<?> invoker = proxyFactory.getInvoker()
在循环之后调用notify
new DefaultFuture
url中是否包含stub参数
protocol->ProtocolConfig
url
new NettyClient()
createAdaptiveExtensionClassCode()
Result result = invoker.invoke(invocation)
AbstractInvoker.invoke()
elements=findMethodReferenceMetadata()
referenceConfigs.get(key)
将生成的BeanDefinition注册到Spring的工厂中等待实例化getRegistry().registerBeanDefinition
2.EnableDubbo配置模式
consumer->ConsumerConfig
proxyFactory.getProxy(invoker)
对应标签
收藏
收藏
0 条评论
下一页