Dubbo源码导读
2022-02-17 10:29:15 49 举报
AI智能生成
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三个关键功能:面向接口的远程方法调用,容错和负载均衡,以及自动服务注册和发现。通过阅读Dubbo源码,我们可以深入理解其内部工作原理,包括如何实现这些核心功能。此外,Dubbo源码还展示了许多优秀的编程实践,如模块化设计、良好的命名规范和注释等。总之,阅读Dubbo源码不仅有助于提高我们的技术水平,还能为我们提供一个很好的学习机会。
作者其他创作
大纲/内容
架构
节点角色
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方
Registry 服务注册于发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器
调用关系
服务容器负责启动,加载,运行服务提供者。
服务提供者在启动时,向注册中心注册自己提供的服务
服务消费者在启动时,向注册中心订阅自己所需的服务
注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者
服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用
服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
SPI
1.配置文件放置在META-INF/dubbo路径下
2.ExtensionLoader.getExtensionLoader(Robot.class)获取ExtensionLoader实例
3.extensionLoader.getExtension('optimusPrime)加载扩展类实例对象
1.检查缓存中是否有目标对象,如果有直接返回,否则创建拓展对象
2.createExtension('optimusPrime')
1.getExtensionClasses从配置文件中加载所有扩展类,得到'配置项名称'到'配置类'的映射关系表
2.通过反射创建扩展对象
3.向拓展对象中注入依赖
4.将拓展对象包裹在响应的Wrapper对象中
自适应拓展机制
Adaptive注解
通过ExtensionLoader.getExtensionLoader(FruitGranter.class).getAdaptiveExtension()获取一个FrutiGranter对象
1.调用 getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象
1.调用 getExtensionClasses 获取所有的拓展类
2.检查缓存,若缓存不为空,则返回缓存
3.若缓存为空,则调用 createAdaptiveExtensionClass 创建自适应拓展类
1.构建自适应拓展代码
1.Adaptive注解检测
2.生成类
3.生成方法
1.无 Adaptive 注解方法代码生成逻辑
2.获取URL数据
3.获取Adapative注解值
4.检查Invocation参数
5.生成拓展名获取逻辑
6.生成拓展加载与目标方法调用逻辑
7.生成完整的方法
2.获取编译器实现类
3.编译代码,生成class
2.通过反射进行实例化
3.调用 injectExtension 方法向拓展实例中注入依赖
使用该FruitGranter调用其自适应标注的方法,获取调用结果
服务注册
1.前置工作
1.检查配置
2.多协议多注册中心导出服务
加载注册中心链接
1.检测是否存在注册中心配置类,不存在则抛出异常
2.构建参数映射集合,也就是 map
3.构建注册中心链接列表
4.遍历链接列表,并根据条件决定是否将其添加到 registryList 中
组装URL
2.导出服务
1.Invoker创建过程
1.为目标类创建wrapper
1.初始化操作
2.为 public 级别的成员变量生成条件判断取值与赋值代码
3.为定义在当前类中的方法生成判断语句,和方法调用语句
4.处理 getter、setter 以及以 is/has/can 开头的方法
5.通过 ClassGenerator 为刚刚生成的代码构建 Class 类,并通过反射创建对象
2.创建匿名Invoker类对象,并实现doInvoke方法
2.导出服务到本地
1.创建新的URL,设置协议头为injvm,设置host为本地,端口为0,其余不变
2.创建Invoker
3.调用InjvmProtocol.export(invoker)方法导出服务
3.导出服务到远程
1.服务导出
1.获取服务标识
2.创建DubboExpoter
3.将<key,exporter>键值放入缓存中
4.本地存根相关代码
5.启动服务器
1.获取host:port
2.服务器实例创建
1.检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
2.创建服务器实例
1.创建boss和worker线程池
2.创建ServerBootstrap
3.设置pipelineFactory
4.绑定指定的ip和端口上
子主题
3.检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常
2.服务注册
1.获取注册中心实例
1.创建Zoookeeper客户端
2.向注册中心注册服务
服务发现
处理配置
检测或创建ConsumerConfig
加载interfaceClass
从系统属性或者配置文件中加载与接口名相对应的配置,并将解析结果赋值给url字段
检查核心配置类是否为空,如果为空则从其他配置类中获取
收集各种配置,并将配置存储到map中
处理MethodConfig实例
解析服务消费者ip,以及调用createProxy创建代理对象
引用服务
检查是什么类型调用
创建Invoker
DubboProtocol.refer
getClients获取客户端实例
默认共享客户端
getSharedClient
initClient
获取客户端类型,默认netty
添加编解码和心跳包参数到url中
检测客户端类型是否存在,不存在则抛出异常
创建ExchangeClient实例
RegisterProtocol.refer
取registry参数值,将其设置为协议头
获取注册中心实例
获取group配置,根据group配置决定doRefer第一个参数Cluster类型
doRefer
创建RegistryDirectory实例
设置注册中心和协议
生成消费者链接
注册服务消费者,在consumers目录下的新节点
订阅providers,configurators,rotors等节点数据
一个服务可能有多个提供者,Cluster将多个服务提供者合并成一个,并生成Invoker
创建代理
获取接口列表
Proxy.getProxy获取Proxy子类
创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例
遍历接口列表
检测类型是否为接口
重新加载接口类
缓存中加载是否有该接口类代理
创建ccp,为服务接口生成代理类
创建ccm
通过ccm创建的类,调用newInstance()反射创建Proxy实例
服务目录
AbstractDirectory
list
1.doList方法列举Invokers,doList是模板方法,由子类实现
2.根据Router的getUrl返回值为空与否,以及runtime参数决定是否进行服务路由
StaticDirectory
RegistryDirectory
列举Invoker,doList方法
接收服务变更通知
1.获取url中的categoty参数
2.根据category参数将url分别放到不同的列表中
1.添加路由器url
2.添加配置器url
3.添加服务提供者url
roConfigurators将url列表转成Configurator列表
toRouters将url列表转成Router列表
调用 refreshInvoker 方法刷新 Invoker 列表
invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
将url转成Invoker
检测消费端是否支持url服务端协议配置
查询缓存是否有url对应的Invoker
新建Invoker
将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
1.从Invoker中获取methods参数
2.切分methods参数值,得到方法名数组
3.以方法名为键,Invoker 列表为值,将映射关系存储到 newMethodInvokerMap 中
4.进行方法级别路由
5.对 Invoker 列表进行排序,并转成不可变列表
合并多个组的 Invoker
1.从方法名中获取invokers
2.从invoker中获取分组配置
3.生成group到Invoker列表的映射关系
4.若有多组,通过集合类合并每组Invoker
销毁无用Invoker
服务路由
表达式解析
获取路由器规则
定位=>分隔符
分别获取服务消费者和提供者匹配规则
解析服务消费者匹配规则
通过正则表达式匹配路由规则
根据分隔符将分割结果存储
解析服务提供者匹配规则
将解析出的匹配规则分别赋值给 whenCondition 和 thenCondition 成员变量
服务路由
对服务消费者条件进行匹配,如果匹配失败则表明服务消费者url不符合匹配规则
将服务提供者或者消费者url转成map
遍历condition列表,获取匹配项名称比如host、method等
如果invocation不为空,且key为method(s),表示进行方法匹配
invocation为空,从map中获取key指定字段
对提取的字段进行过滤
1、match非空,mismatch为空
1、match空,mismatch为空
1、match非空,mismatch非空
1、match空,mismatch空
若服务提供者未配置,则指定的消费者禁用服务
对invoker服务提供者进行筛选,若符合加入result
集群
集群容错
1.服务消费者初始化期间,集群Cluster实现类为服务消费者创建Cluster Invoker实例,图中的merge操作
2.服务消费者远程调用时
源码分析
1.Cluster实现类分析
FailoverCluster
FailbackCluster
2.Cluster Invoker分析
父类invoke方法
1.列举Invoker
2.加载LoadBalance
3.调用doInvoke进行后续操作,该方法是抽象方法,由子类实现
FailoverClusterInvoker
1.获取重试次数
2.根据重试次数,循环调用,失败重试
3.重新列举服务Invoker
4.通过负载均衡选择Invoker
1.获取调用方法名
2.获取sticky配置
3.检测stickyInvoker是否在invokers列表中
4.是否返回stickyInvoker
5.stickyInvoker不可用,doSelect选择Invoker
1.选择loadBalance
2.通过负载均衡组件选择Invoker
3.如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
1.查找可用的Invoker,并将其添加到reselectInvokers集合中
2.如果reselectInvokers不为空,则通过负载均衡组件再次选择
4.如果reselect选出来的invoker为空,则取invoker顺序加一的invoker
6.如果sticky为true,则将负载均衡组件选出来的Invoker赋值给stickyInvoker
5.添加到invoker到invoked列表中
6.设置invoked到RPC上下文中
7.调用目标Invoker的invoke方法
FailbackClusterInvoker
1.选择Invoker
2.进行调用
3.如果调用失败将调用信息存入到failed中,等待定时重传
FailfastClusterInvoker
FailsafeClusterInvoker
doInvoke
ForkingClusterInvoker
获取forks配置,并循环选出forks个Invoker,并添加到selected中
通过线程池,遍历selected并发调用多个Invoker,并将结果存储到阻塞队列中
从阻塞队列中获取结果,并对返回结果类型进行判断。如果为异常类型,则直接抛出,否则返回
如果全部调用失败,则阻塞队列中添加的是异常,从阻塞队列中取出数据时先判断是否为异常类型
BroadcastClusterInvoker
负载均衡
AbstractLoadBalance
select
1.检查invokers集合的合法性
2.如果invokers列表中只含有一个Invoker,直接返回
3.调用doSelect方法进行负载均衡,该方法为抽象方法,由子类实现
RandomLoadBanlance
doSelect
1.计算总权重totalWeight
2.检测每个服务提供者的权重是否相同,如果相同随机选一个
3.获取随机数,并计算落到的区间
LeastActiveLoadBalance
doSelect
遍历 invokers 列表,寻找活跃数最小的 Invoker
如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可
ConsistentHashLoadBalance
doSelect
1.获取invokers原始的hashcode
2.selectors.get(key)
3.固若hashcode不同则发生了变化,重新缓存
一致性 hash 选择器 ConsistentHashSelector 的初始化
获取虚拟节点数,默认是160,即每个invoker在圆环上有多少节点
获取参与hash计算的参数默认下标值,默认对第一个参数进行hash计算
遍历invoker
获取address,并对address+i进行MD5计算,得到长度16字节的数组
对MD5部分字节新型4次hash计算,得到四个不同的long型正整数
将hash到invoker的映射关系存储到virtualInvokers中
4.调用 ConsistentHashSelector 的 select 方法选择 Invoker
selector.select
对参数进行md5以及hash计算,得到hash值
根据hash值到TreeMap中查找目标的Invoker
RoundRobinLoadBalance
doSelect
服务调用过程
调用过程
服务调用方式
同步
发送请求,得到一个ResponseFuture实例,并调用该实例的get方法进行等待
DefaultFuture#get
异步
异步有返回值
发送请求,得到一个ResponseFuture实例
设置Future到上下文中
暂时返回一个空结果
异步无返回值
发送请求
设置上下文的future字段为null
返回一个空的RpcResult
服务消费方发送请求
发送请求
ReferenceCountExchangeClient
request
HeaderExchangeClient
request
HeaderExchangeChannel
request
AbstractPeer
send
AbstractClient
send
获取Cahnnel,getChannel是一个抽象方法由子类实现
channel.send(message,sent)
NettyChannel
send
发送消息(包含请求和响应消息)
如果sent为true,则需要等待消息发出,若在规定时间没能发出,success会被置为false
若success为false,抛出异常
请求编码
ExchangeCodec
encodeRequest
创建消息头字节数组,长度为16
设置魔法数0xdabb
设置数据包类型(request/response)和序列化器编号
设置通信方式(单向/双向)
设置事件标识
设置请求编号,8个字节,从第4个字节开始设置
创建序列化器,比如Hessian2ObjectOutput
对请求数据进行序列化操作
DubboCodec#encodeRequestData
依次序列化 dubbo version、path、version
序列化调用方法名
将参数类型转换为字符串,并进行序列化
对运行时参数进行序列化
序列化 attachments
把消息体长度写入到消息头中
服务提供方接收请求
请求解码
检查魔数是否相等
检查可读数据量是否小于消息头长度,若小于则立即返回DecodeResult.NEED_MORE_INPUT
从消息头中获取消息体长度
检测消息体长度是否超出限制,超出则抛出异常
检测可读的字节数是否小于实际的字节数
读取实际字节长度,继续进行解码工作
获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
获取调用编号
通过逻辑与运算得到调用类型,0 - Response,1 - Request
创建Request对象
通过逻辑与运算得到通信方式,并设置到 Request 对象中
通过位运算检测数据包是否为事件类型,如心跳事件
根据 url 参数判断是否在 IO 线程上对消息体进行解码
在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
调用方法名、attachment、以及调用参数解析出来
调用方法名、attachment、以及调用参数解析出来
通过反序列化得到dubbo version,path,version并保存到attachments变量中
通过反序列换得到调用方法名
通过反序列化得到参数类型字符串,如Ljava/lang/String
将参数字符串解析成字符数组,并解析运行时参数,设置参数类型数组
设置参数列表
仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
设置data到Request对象中
解码完成得到一个完整的Request对象,对象会被传到先一个入站处理器
调用服务
NettyHandler.messageReceived
获取NettyChannel
继续向下调用
线程派发模型
AllChannelHandler.received
将请求和响应消息派发到线程池中处理
调用服务
ChannelEventRunnable#run
检测通道状态,对于请求或者响应效益,此时state=RECEIVED
将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
其他消息类型通过 switch 进行处理
DecodeHandler#received(Channel,Object)
HeaderExchangeHandler#received(Channel, Object)
处理请求对象
对于普通请求
双向通信
向后调用服务,并得到调用结果
handleRequest
检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
获取 data 字段值,也就是 RpcInvocation 对象
继续向下调用
设置 OK 状态码
设置调用结果
将调用结果返回给服务消费端
如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
DubboProtocol.requestHandler#reply(ExchangeChannel,Object)
获取Invoker实例
计算service key
从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象
获取 Invoker 对象,并返回
通过Invoker调用具体的服务
AbstractProxyInvoker#invoke(Invocation)
调用doInvoke执行后续的调用,并将调用结果封装到RpcResult中
JavassistProxyFactory#getInvoker
创建匿名类对象
调用invokeMethod方法进行后续的调用
服务提供方返回调用结果
ExchangeCodec#encodeResponse
。。。
对调用结果进行序列化操作
DubboCodec#encodeResponseData
。。。
服务消费方接收调用结果
响应数据解码
与请求解码过程类似
DubboCodec#decodeBody
获取请求编号
检测消息类型,若下面的条件成立,表明消息类型为 Response
创建 Response 对象
根据 url 参数决定是否在 IO 线程上执行解码逻辑
创建 DecodeableRpcResult 对象
进行后续的解码工作
执行反序列化操作
向用户线程传递调用结果
HandlerExchangeHandler#received
处理响应
DefaultFuture#received(Channel, Response)
根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
保存响应对象到相应的DefaultFuture实例中
唤醒用户线程,随后用户线程即可从DefaultFuture实例中获取到相应的结果
0 条评论
下一页