Dubbo源码导读
2022-02-17 10:29:15 49 举报
AI智能生成
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三个关键功能:面向接口的远程方法调用,容错和负载均衡,以及自动服务注册和发现。通过阅读Dubbo源码,我们可以深入理解其内部工作原理,包括如何实现这些核心功能。此外,Dubbo源码还展示了许多优秀的编程实践,如模块化设计、良好的命名规范和注释等。总之,阅读Dubbo源码不仅有助于提高我们的技术水平,还能为我们提供一个很好的学习机会。
作者其他创作
大纲/内容
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方
Registry 服务注册于发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器
节点角色
服务容器负责启动,加载,运行服务提供者。
服务提供者在启动时,向注册中心注册自己提供的服务
服务消费者在启动时,向注册中心订阅自己所需的服务
注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者
服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用
服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
调用关系
架构
1.配置文件放置在META-INF/dubbo路径下
2.ExtensionLoader.getExtensionLoader(Robot.class)获取ExtensionLoader实例
1.检查缓存中是否有目标对象,如果有直接返回,否则创建拓展对象
1.getExtensionClasses从配置文件中加载所有扩展类,得到'配置项名称'到'配置类'的映射关系表
2.通过反射创建扩展对象
3.向拓展对象中注入依赖
4.将拓展对象包裹在响应的Wrapper对象中
2.createExtension('optimusPrime')
3.extensionLoader.getExtension('optimusPrime)加载扩展类实例对象
SPI
Adaptive注解
1.调用 getExtensionClasses 获取所有的拓展类
2.检查缓存,若缓存不为空,则返回缓存
1.Adaptive注解检测
2.生成类
1.无 Adaptive 注解方法代码生成逻辑
2.获取URL数据
3.获取Adapative注解值
4.检查Invocation参数
5.生成拓展名获取逻辑
6.生成拓展加载与目标方法调用逻辑
7.生成完整的方法
3.生成方法
1.构建自适应拓展代码
2.获取编译器实现类
3.编译代码,生成class
3.若缓存为空,则调用 createAdaptiveExtensionClass 创建自适应拓展类
1.调用 getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象
2.通过反射进行实例化
3.调用 injectExtension 方法向拓展实例中注入依赖
通过ExtensionLoader.getExtensionLoader(FruitGranter.class).getAdaptiveExtension()获取一个FrutiGranter对象
使用该FruitGranter调用其自适应标注的方法,获取调用结果
自适应拓展机制
1.检查配置
1.检测是否存在注册中心配置类,不存在则抛出异常
2.构建参数映射集合,也就是 map
3.构建注册中心链接列表
4.遍历链接列表,并根据条件决定是否将其添加到 registryList 中
加载注册中心链接
组装URL
2.多协议多注册中心导出服务
1.前置工作
1.初始化操作
2.为 public 级别的成员变量生成条件判断取值与赋值代码
3.为定义在当前类中的方法生成判断语句,和方法调用语句
4.处理 getter、setter 以及以 is/has/can 开头的方法
5.通过 ClassGenerator 为刚刚生成的代码构建 Class 类,并通过反射创建对象
1.为目标类创建wrapper
2.创建匿名Invoker类对象,并实现doInvoke方法
1.Invoker创建过程
1.创建新的URL,设置协议头为injvm,设置host为本地,端口为0,其余不变
2.创建Invoker
3.调用InjvmProtocol.export(invoker)方法导出服务
2.导出服务到本地
1.获取服务标识
2.创建DubboExpoter
4.本地存根相关代码
1.获取host:port
1.检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
1.创建boss和worker线程池
2.创建ServerBootstrap
3.设置pipelineFactory
4.绑定指定的ip和端口上
子主题
2.创建服务器实例
3.检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常
2.服务器实例创建
5.启动服务器
1.服务导出
1.创建Zoookeeper客户端
1.获取注册中心实例
2.向注册中心注册服务
2.服务注册
3.导出服务到远程
2.导出服务
服务注册
检测或创建ConsumerConfig
加载interfaceClass
从系统属性或者配置文件中加载与接口名相对应的配置,并将解析结果赋值给url字段
检查核心配置类是否为空,如果为空则从其他配置类中获取
收集各种配置,并将配置存储到map中
处理MethodConfig实例
解析服务消费者ip,以及调用createProxy创建代理对象
处理配置
检查是什么类型调用
默认共享客户端
获取客户端类型,默认netty
添加编解码和心跳包参数到url中
检测客户端类型是否存在,不存在则抛出异常
创建ExchangeClient实例
initClient
getSharedClient
getClients获取客户端实例
DubboProtocol.refer
取registry参数值,将其设置为协议头
获取注册中心实例
获取group配置,根据group配置决定doRefer第一个参数Cluster类型
创建RegistryDirectory实例
设置注册中心和协议
生成消费者链接
注册服务消费者,在consumers目录下的新节点
订阅providers,configurators,rotors等节点数据
一个服务可能有多个提供者,Cluster将多个服务提供者合并成一个,并生成Invoker
doRefer
RegisterProtocol.refer
创建Invoker
获取接口列表
Proxy.getProxy获取Proxy子类
遍历接口列表
检测类型是否为接口
重新加载接口类
缓存中加载是否有该接口类代理
创建ccm
通过ccm创建的类,调用newInstance()反射创建Proxy实例
创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例
创建代理
引用服务
服务发现
2.根据Router的getUrl返回值为空与否,以及runtime参数决定是否进行服务路由
list
AbstractDirectory
StaticDirectory
列举Invoker,doList方法
1.获取url中的categoty参数
1.添加路由器url
2.添加配置器url
3.添加服务提供者url
2.根据category参数将url分别放到不同的列表中
接收服务变更通知
roConfigurators将url列表转成Configurator列表
toRouters将url列表转成Router列表
invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
检测消费端是否支持url服务端协议配置
查询缓存是否有url对应的Invoker
新建Invoker
将url转成Invoker
1.从Invoker中获取methods参数
2.切分methods参数值,得到方法名数组
3.以方法名为键,Invoker 列表为值,将映射关系存储到 newMethodInvokerMap 中
4.进行方法级别路由
5.对 Invoker 列表进行排序,并转成不可变列表
将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
1.从方法名中获取invokers
2.从invoker中获取分组配置
3.生成group到Invoker列表的映射关系
4.若有多组,通过集合类合并每组Invoker
合并多个组的 Invoker
销毁无用Invoker
调用 refreshInvoker 方法刷新 Invoker 列表
RegistryDirectory
服务目录
获取路由器规则
定位=>分隔符
分别获取服务消费者和提供者匹配规则
通过正则表达式匹配路由规则
根据分隔符将分割结果存储
解析服务消费者匹配规则
解析服务提供者匹配规则
将解析出的匹配规则分别赋值给 whenCondition 和 thenCondition 成员变量
表达式解析
将服务提供者或者消费者url转成map
遍历condition列表,获取匹配项名称比如host、method等
invocation为空,从map中获取key指定字段
1、match非空,mismatch为空
1、match空,mismatch为空
1、match非空,mismatch非空
1、match空,mismatch空
对提取的字段进行过滤
对服务消费者条件进行匹配,如果匹配失败则表明服务消费者url不符合匹配规则
若服务提供者未配置,则指定的消费者禁用服务
对invoker服务提供者进行筛选,若符合加入result
服务路由
1.服务消费者初始化期间,集群Cluster实现类为服务消费者创建Cluster Invoker实例,图中的merge操作
2.服务消费者远程调用时
集群容错
FailoverCluster
FailbackCluster
1.Cluster实现类分析
1.列举Invoker
2.加载LoadBalance
3.调用doInvoke进行后续操作,该方法是抽象方法,由子类实现
父类invoke方法
1.获取重试次数
2.根据重试次数,循环调用,失败重试
3.重新列举服务Invoker
1.获取调用方法名
2.获取sticky配置
3.检测stickyInvoker是否在invokers列表中
4.是否返回stickyInvoker
1.选择loadBalance
2.通过负载均衡组件选择Invoker
1.查找可用的Invoker,并将其添加到reselectInvokers集合中
2.如果reselectInvokers不为空,则通过负载均衡组件再次选择
3.如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
4.如果reselect选出来的invoker为空,则取invoker顺序加一的invoker
5.stickyInvoker不可用,doSelect选择Invoker
6.如果sticky为true,则将负载均衡组件选出来的Invoker赋值给stickyInvoker
4.通过负载均衡选择Invoker
5.添加到invoker到invoked列表中
6.设置invoked到RPC上下文中
7.调用目标Invoker的invoke方法
FailoverClusterInvoker
1.选择Invoker
2.进行调用
3.如果调用失败将调用信息存入到failed中,等待定时重传
FailbackClusterInvoker
FailfastClusterInvoker
doInvoke
FailsafeClusterInvoker
获取forks配置,并循环选出forks个Invoker,并添加到selected中
通过线程池,遍历selected并发调用多个Invoker,并将结果存储到阻塞队列中
从阻塞队列中获取结果,并对返回结果类型进行判断。如果为异常类型,则直接抛出,否则返回
如果全部调用失败,则阻塞队列中添加的是异常,从阻塞队列中取出数据时先判断是否为异常类型
ForkingClusterInvoker
BroadcastClusterInvoker
2.Cluster Invoker分析
源码分析
集群
1.检查invokers集合的合法性
2.如果invokers列表中只含有一个Invoker,直接返回
3.调用doSelect方法进行负载均衡,该方法为抽象方法,由子类实现
select
AbstractLoadBalance
1.计算总权重totalWeight
2.检测每个服务提供者的权重是否相同,如果相同随机选一个
3.获取随机数,并计算落到的区间
doSelect
RandomLoadBanlance
遍历 invokers 列表,寻找活跃数最小的 Invoker
如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可
LeastActiveLoadBalance
1.获取invokers原始的hashcode
2.selectors.get(key)
获取参与hash计算的参数默认下标值,默认对第一个参数进行hash计算
对MD5部分字节新型4次hash计算,得到四个不同的long型正整数
将hash到invoker的映射关系存储到virtualInvokers中
遍历invoker
一致性 hash 选择器 ConsistentHashSelector 的初始化
3.固若hashcode不同则发生了变化,重新缓存
对参数进行md5以及hash计算,得到hash值
根据hash值到TreeMap中查找目标的Invoker
selector.select
4.调用 ConsistentHashSelector 的 select 方法选择 Invoker
ConsistentHashLoadBalance
RoundRobinLoadBalance
负载均衡
DefaultFuture#get
发送请求,得到一个ResponseFuture实例,并调用该实例的get方法进行等待
同步
发送请求,得到一个ResponseFuture实例
设置Future到上下文中
暂时返回一个空结果
异步有返回值
发送请求
设置上下文的future字段为null
返回一个空的RpcResult
异步无返回值
异步
服务调用方式
request
ReferenceCountExchangeClient
HeaderExchangeClient
HeaderExchangeChannel
send
AbstractPeer
获取Cahnnel,getChannel是一个抽象方法由子类实现
AbstractClient
发送消息(包含请求和响应消息)
如果sent为true,则需要等待消息发出,若在规定时间没能发出,success会被置为false
若success为false,抛出异常
NettyChannel
创建消息头字节数组,长度为16
设置魔法数0xdabb
设置数据包类型(request/response)和序列化器编号
设置通信方式(单向/双向)
设置事件标识
设置请求编号,8个字节,从第4个字节开始设置
创建序列化器,比如Hessian2ObjectOutput
依次序列化 dubbo version、path、version
序列化调用方法名
将参数类型转换为字符串,并进行序列化
对运行时参数进行序列化
序列化 attachments
DubboCodec#encodeRequestData
对请求数据进行序列化操作
把消息体长度写入到消息头中
encodeRequest
ExchangeCodec
请求编码
服务消费方发送请求
检查魔数是否相等
检查可读数据量是否小于消息头长度,若小于则立即返回DecodeResult.NEED_MORE_INPUT
从消息头中获取消息体长度
检测消息体长度是否超出限制,超出则抛出异常
检测可读的字节数是否小于实际的字节数
获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
获取调用编号
通过逻辑与运算得到调用类型,0 - Response,1 - Request
创建Request对象
通过逻辑与运算得到通信方式,并设置到 Request 对象中
通过位运算检测数据包是否为事件类型,如心跳事件
通过反序列换得到调用方法名
通过反序列化得到参数类型字符串,如Ljava/lang/String
将参数字符串解析成字符数组,并解析运行时参数,设置参数类型数组
设置参数列表
在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将调用方法名、attachment、以及调用参数解析出来
仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
根据 url 参数判断是否在 IO 线程上对消息体进行解码
设置data到Request对象中
解码完成得到一个完整的Request对象,对象会被传到先一个入站处理器
读取实际字节长度,继续进行解码工作
请求解码
获取NettyChannel
继续向下调用
NettyHandler.messageReceived
线程派发模型
将请求和响应消息派发到线程池中处理
AllChannelHandler.received
检测通道状态,对于请求或者响应效益,此时state=RECEIVED
将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
其他消息类型通过 switch 进行处理
ChannelEventRunnable#run
处理请求对象
检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
获取 data 字段值,也就是 RpcInvocation 对象
设置 OK 状态码
设置调用结果
handleRequest
向后调用服务,并得到调用结果
将调用结果返回给服务消费端
双向通信
如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
对于普通请求
计算service key
从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象
获取 Invoker 对象,并返回
获取Invoker实例
通过Invoker调用具体的服务
调用doInvoke执行后续的调用,并将调用结果封装到RpcResult中
AbstractProxyInvoker#invoke(Invocation)
创建匿名类对象
调用invokeMethod方法进行后续的调用
JavassistProxyFactory#getInvoker
调用服务
服务提供方接收请求
。。。
DubboCodec#encodeResponseData
对调用结果进行序列化操作
ExchangeCodec#encodeResponse
服务提供方返回调用结果
获取请求编号
检测消息类型,若下面的条件成立,表明消息类型为 Response
创建 Response 对象
创建 DecodeableRpcResult 对象
执行反序列化操作
进行后续的解码工作
根据 url 参数决定是否在 IO 线程上执行解码逻辑
DubboCodec#decodeBody
与请求解码过程类似
响应数据解码
根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
保存响应对象到相应的DefaultFuture实例中
唤醒用户线程,随后用户线程即可从DefaultFuture实例中获取到相应的结果
处理响应
HandlerExchangeHandler#received
向用户线程传递调用结果
服务消费方接收调用结果
调用过程
服务调用过程
Dubbo
0 条评论
回复 删除
下一页