10_Netty源码
2024-05-23 21:36:36 15 举报
Netty原理及源码剖析
作者其他创作
大纲/内容
listener 访问者onResult
获取 clientTransport
SubchannelPickerpickSubchannel()通过 picker 后去负载后的 server 地址 PickResult
绑定端口ServerBootstrap#bind(50070)
实现
返回ClientStream
创建8个请求处理线程NioEventLoopnew EventExecutor[nEventExecutors]
startNewTransport
ClientCalls
LoadBalancerRegistry
NettyClientStream.SinkwriteFrame
创建 channel buildernewChannelBuilder
StaticNameResolverProvider
AbstractManagedChannelImplBuildertarget
childGroup 中的NioEventLoop线程开始不断监听注册到自己selector上的socketChannel的CONNECT事件处理connect事件,建立连接
服务解析器注册
start
每个线程拥有自己的selector
builder.build()
updateBalancingState更新负载状态
故障摘除
channel服务发现/负载均衡
LbHelperImpl 负载均衡工具
https://blog.csdn.net/hxcaifly/article/details/85336795
2、start
postProcessBeforeInitialization根据 grpcClient 注解 创建 channel
构建地址解析器NameResolver
获取channel 选择器 SubchannelPicker
NettyClientStream
构建loadbalancerFactory
创建 netty grpc channel 工厂nettyGrpcChannelFactory
defaultPolicy“pick_first”
NioEventLoopGroup
transport网络传输字节序列化
拦截器GlobalClientInterceptorRegistry
设置对象
注册到实际的parentGroup中的线程(NioEventLoop)中的selector上,一个NioEventLoop对应一个selector关注OP_ACCEPTSelectableChannel#register()拿到selectionKey
实际的 channelInternalSubchannel
createChannel()
设置channel属性channel.attr(key).set(e.getValue());
EventLoopGroupchild请求承接线程池
run()不断循环检查是否有SELECT事件
spring 容器初始化BeanPostProcessor生成 grpc channel 的关键
createChannel()创建带拦截器的 channel
如果 picker 为空,可能会初始化syncContext.execute(new ExitIdleModeForTransport())
处理ACCEPT事件NioEventLoop#processSelectedKey()
forTarget
Sink负责写入数据
开始注册SingleThreadEventLoop-> AbstractNioChannel#doRegister
GrpcClientBeanPostProcessor
configure 将配置设置到channel
halfClose
stubgRPC 服务封装protobuf 协议-序列化
创建 NettyChannelBuildernewChannelBuilder()
不断轮训处理socket事件
ClientCallImpl
提交任务给线程池
AbstractManagedChannelImplBuilderbuild()
创建 channel
轮训获取parentGroup线程池中的处理线程 (NioEventLoop)executors[Math.abs(idx.getAndIncrement() % executors.length)
ManagedChannel
newLoadBalancer返回默认 loadbalancer
默认负责均衡策略AutoConfiguredLoadBalancerFactory中的AutoConfiguredLoadBalancersubchanneltryHandleResolvedAddresses()创建subchannelPicker、创建并连接 subchannel
NameResolverRegistration 服务注册发现
DelayedClientTransportreprocess
默认的负载均衡PickFirstLoadBalancerProvider
初始化
RoundRobinPicker简单轮训
实际调用我们自己的业务逻辑
createSubchannel 创建 subchannel
写入发送队列,等待 childgroup 发送WriteQueueenqueue()
添加ServerBootstrapAcceptor在ChannelPipeline尾部添加acceptor,里面包含我们的childGroup线程池和handler
创建NettyClientTransport
我们自己的接收响应方法observer.onNext(message)
创建 ManagedChannelImpl1、builder2、 nettyTransportFactory3、backoff 重试策略4、线程池5、开启和关闭统计和日志拦截器
NameResolverRegistry.getDefaultRegistry().register(new CustomNameResolverProvider());this.channel = ManagedChannelBuilder .forTarget(\"server\") .usePlaintext() .build();NameResolver 推荐使用 NameResolverRegistry 进行注册;注册要在创建 Channel 之前执行在 Channel 调用 build 方式时,会在 io.grpc.internal.ManagedChannelImpl#ManagedChannelImpl的构造方法中获取 NameResolver.Factory,这个属性的值是由调用 io.grpc.internal.AbstractManagedChannelImplBuilder#getNameResolverFactory 方法获取的,这个方法里面的属性值来自于 io.grpc.NameResolverRegistry#asFactory;NameResolverRegistry 自己通过内部类 NameResolverFactory创建了NameResovler.Factory 的实例,调用 Factory 的 newNameResolver时,从 provider 属性中获取根据优先级排序后的 NameResolver,创建实例并返回第一个创建的有效实例
getChannelFactory
NettyChannelFactory
AutoConfiguredLoadBalancer根据 policy 获取 loadbalancer 的 provider
绑定端口
NameResolverListeneronResult跟新负载均衡地址
OP_CONNECT
ClientStreamListenermessagesAvailable 响应回调
拿到 PickResult
初始化和注册到selectorAbstractBootstrap#initAndRegister
开始监听accept事件
发送socket连接请求
GrpcClientAutoConfiguration
通过 sink 写 frameAbstractClientStreamdeliverFrame
OP_READ
NameResolverRegistryregister()
我们自己的 clientInterceptor
OP_WRITE
AbstractClientStream#start1、为 TransportState 设置响应监听 ClientStreamListener2、NettyClientStream.Sink#writeHeaders 写header
protobuf 转 inputstreamMethodDescriptor#streamRequest
Linux io(selector epoll...)
2
writeUncompressed↓writeBufferChain
decideLoadBalancerProvider选择负载均衡提供者选择顺序1、先用我们的 provider 2、 loadbalancingConfig 策略3、grpclb 自己的4、根据 loadbalancingPolicy 策略对应的 provider5、默认的 pick_first
网络通信框架(netty、OKHTTP、inprogress)
start()首次加载
默认负载均衡工厂AutoConfiguredLoadBalancerFactory
用当前线程直接响应
发送请求AbstractStreamwriteMessage
refresh()刷新
获取 ClientTransportGrpcUtil.getTransportFromPickResult
1 准备
init初始化
构建 transportFactoryNettyTransportFactory
更新 picker
发送 requesttransport.newStream
发送到childGroup线程池进行处理for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i));}
SubchannelImpl
ManagedChannelImpl
messagesAvailable
Bootstrap client初始化
GrpcChannelFactory通过 service name 创建 channel
返回响应到响应缓存
处理read事件处理请求
pipeline调用到这里ServerBootstrap.ServerBootstrapAcceptor#channelRead()
挑选一个NioEventloop
创建请求SubchannelChannelnewCall
注册niosocketchannel
StreamObserverToCallListenerAdapter#onClose
AbstractChannelFactory
MessageFramer#closeMessageFramer#commitToSink
创建nio事件处理线程NioEventLoopNioEventLoopGroup#newChild
所有处理线程共用同一个处理线程池
DnsNameResolver
解析完成
默认实现PickFirstLoadBalancer更新可用的subchannelhandleResolvedAddresses()
parentGroup线程池充当reactor中的acceptor来处理注册请求
获取到parentGroup线程AbstractBootstrapConfig#group
获取到SocketChannel
创建请求处理线程池executor = newDefaultExecutorService(nEventExecutors)
GrpcChannelsPropertieschannel 配置
ServerBootstrap服务器
获取到封装的socketChannel消息Channel child = (Channel) msg;然后用轮训方式获取到childGroup中的一个线程注册到这个socketChannel上面childGroup.register(child)
StreamObserverToCallListenerAdapter#onMessage
构建 NameResolvergetNameResolver()
我们自己的完成方法observer.onCompleted()
写入消息 writePayloadframe 是HTTP2的最小传输单位MessageFramer#writePayload
处理连接请求NioServerSocketChannel#doReadMessages
EventLoopGroupparent请求承接线程池
TransportState负责处理传输状态
HTTP2 协议
NameResolverRegistration服务注册发现
获取 ClientTransport 即NettyClientTransportclientTransportProvider.get()
sendMessage
PickFirstLoadBalancer
List<NameResolverProvider>服务发现与注册提供
selector
channel.newCall
我们自己扩展
exitIdleMode 通过 syncContext 队列让channel 活跃起来1、初始化 LbHelperImpl2、启动 nameResolver.start(listener)
closed
设置 HTTP2 的参数
childGroup线程池冲到react模型中的processor来处理read请求NioEventLoop 处理线程
定时发现
MAC + IP + PORT+TCP
defaultLoadBalancingPolicy
applicationContext.getBean(GrpcChannelFactory.class)
反射构建根据我们设置的ioAbstractBootstrap#channel(NioServerSocketChannel.class)获取到我们要构建的channel为 SocketServerChannel,然后反射实例化
拿到请求对象开始调用startCall
创建 managed channelnewManagedChannel()
obtainActiveTransport
1 调用
读取ACCEPT连接请求AbstractNioMessageChannel.NioMessageUnsafe#read
把socketChannel注册到NioEventloop上关注OP_CONNECT事件
2 请求
关闭 frame 流endOfMessages()
channel初始化
0 条评论
回复 删除
下一页