Netty源码分析以及netty线程模型图
2022-01-25 22:13:43 2 举报
参考图灵课堂诸葛老师的讲解绘制
作者其他创作
大纲/内容
init(channel)
Tail
initAndRegister()
Netty线程模型图:注意建立连接和读写请求时不同selector的处理。以及NioSocketChannel和NioServerSocketChannel注册到了不同的Selector上
DefaultChannelPipeline
1、标记 registered属性为true2、暂存 pendingHandlerCallbackHead属性,然后将其赋值为null3、执行暂存对象pendingHandlerCallbackHead
afterRunningAllTasks()
addLast0(newCtx)
workGroup
task任务的解析,这个任务其实就是触发管道里的所有处理器,链式调用处理器,触发各个处理器的连接注册操作pipeline.fireChannelRegistered()
NioByteUnsafe.read()
返回 NioServerSocketChannel创建时赋值的unsafe属性 NioMessageUnsafe类型
NioServerSocketChannel
读事件NioSocketChannel父类的内部类
AbstractBootstrap#doBind
new NioEventLoop(...)
addTask(Runnable task),即是将将来执行的任务【客户端与服务器建立连接即将ServerSocketChannel注册到EventLoop上】加入队列中
doReadBytes(byteBuf)
等同于Nio编程中的ServerSocketChannel.accept()作用是获取客户端的连接
ch.configureBlocking(false)
给pipeline中添加两个处理器,一个是和BossGroup相关的handler,一个是和workGroup相关的ServerBootstrapAcceptor,而ServerBootstrapAcceptor处理器的添加是在workGroup的NIOEventLoop对应线程的异步任务执行的,它一个系统级处理器又称为连接器ServerBootstrapAcceptor,用于后续回调
new PendingHandlerAddedTask(ctx)
还记得在 init() 方法中向pipeline中尾部追加了一个 ChannelInitializer类型(重写initChannel方法)的handler吗?
callHandlerAdded0(ctx)
readBuf里放的是注册了OP_Accept读事件连接过来的所有NIOSocketChannel
注册事件NioServerSocketChannel父类的内部类
PendingHandlerAddedTask.execute()
SingleThreadEventExecutor.this.run()该类的run方法是个抽象方法会回调NioEventLoop#run方法
通过这种设计方式,当Selector上有事件到达后,不需要通过 selector.selectedKeys()方法获取事件,直接操作 NioEventLoop.selectedKeys属性就好
注意:ServerBootstrapAcceptor.handlerAdded() 是个空方法,并不像ChannelInitializer会从管道中移除自身
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet()
SelectionKey.OP_READ | SelectionKey.OP_ACCEPT(其他类型的事件,暂不分析)【由于入栈和出栈走的逻辑一样,只是处理器的调用顺序不同,入栈从前到后,出站从后向前accept客户端连接时监听的事件,read是客户端发数据时监听的事件】
如果 taskQueue 不为空,执行selectNow();否则,返回 SelectStrategy.SELECT。虽然队列中有个(注册)任务,但selector上还没有注册任何兴趣事件,注定本次selectNow()不会有收获
pipeline.fireChannelRead(byteBuf)
返回下一个 NioEventLoop由于NioEventLoop本身没有重写register(),所以调用父类SingleThreadEventLoop方法
将客户端连接SocketChannel的读写事件注册到子事件循环线程组的Selector上,负责后续的IO读写[注意区分:ServerSocketChannel是注册到了boosGroup事件 循环的Selector上了]
runAllTasksFrom(taskQueue)=》task.run()任务执行
ServerBootstrap
创建一个回调任务 task
从pipeline中移除这个特殊类型的handler
将channel设置为非阻塞
这就是Nio编程中的 selector.select();只不过调用的是其超时执行的重载方法,当过了超时时间还没有接收到事件将会继续向下执行而不会继续阻塞
HeadContext
taskQueue.offer(task)
绑定9000端口
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); }});
processSelectedKeys() 处理连接过来的事件,如果有事件则将连接获取的channel注册到workgroup里的NioEventLoop里的Selector上,如果没有事件,则执行队列中的异步任务
小总结:pipeline.addLast(ChannelHandler) 方法中主要是做两件事:1、创建一个持有传入handler的上下文对象,然后加入到pipeline的尾部【其实是倒数第二位置】;2、调用传入的ChannelHandler.handlerAdded(...)方法。其实handlerAdded的触发机制主要分两种: 第一种是回调方式,体现在ServerBootstrap.init() 时向pipeline中添加一个内置的处理器,那时NioServerSocketChannel 还没有完成注册(即Pipeline的registered属性,一旦标记为true后,再也不会改变),并没有立即调用handlerAdded()方法,而是注册完成后才发起的回调; 第二种是实时调用(其实也不够准确,如果当前执行线程不是NioEventLoop中线程池中的,那么异步线程调用)。除了ServerBootstrap.init()方法中添加的处理器,其他任何形式的pipeline.addLast(),都是会走实时调用。
真正添加处理器上下文
MultithreadEventLoopGroup#register(io.netty.channel.Channel)
doRegister()
Head
pipeline.fireChannelReadComplete()
runAllTasks()
eventLoop.execute中执行。即SingleThreadEventExecutor#execute(java.lang.Runnable)方法。
this(newSocket(DEFAULT_SELECTOR_PROVIDER))=》NioServerSocketChannel(ServerSocketChannel channel)
将客户端发送的二进制消息读到给定的ByteBuf中,并返回读取的字节数量
AbstractChannelHandlerContext.span style=\"font-size: inherit;\
赋值于 selectedKeys属性
ChannelInitializer.handlerAdded(ChannelHandlerContext ctx)
TailContext
重点分析方法!
放到队列中的注册任务是何时执行的?是在runAllTasks()的时候执行的
handler().handlerAdded(this)
pipeline.invokeHandlerAddedIfNeeded()channel注册时调用,底层会调用初始化处理器的initChannel((C) ctx.channel())方法,调用完会删除该初始化handler,将ServerBootstrapAcceptor处理器添加进去【这个调用逻辑比较难找】
创建HandlerContext对象,用handler进行属性赋值,同时持有当前的pipeline
MultithreadEventLoopGroup.register(channel)
本地创建 selectedKeySet等同于Nio编程中 selector.selectedKeys()中的返回值
链式调用Pipeline中所有的入站类型的处理器
NioEventLoop(父类)维护了一个定时任务队列scheduledTaskQueue从定时任务队列中取出所有到期的任务,加入到任务队列taskQueue中;添加失败,则重新放回到定时任务队列中
config().group() 返回的ServerBootstrap的group属性也就是 bossGroup,继承MultithreadEventLoopGroup抽象类
将注册逻辑这个任务【该任务实现了runnable接口,当回调时会异步执行】加入到任务队列中
AbstractBootstrap#bind(int)
疑问:在Nio编程中,注册时传入的是SelectionKey.OP_ACCEPT(数值为16);而Netty源码中注册时,传入的是0,很是不解!?
上下方法之间:针对当前channel的属性赋值 registered = true
总共三大步:1:反射利用无参构造创建ServerSocketChannel和new对象创建channelPipeline,此时channel也会以参数的形式注入进来,构建处理器上下文链表,持有链表中的头尾处理器上下文:2:初始化Channel,往channelPipeline里加入ServerBootstrapAcceptor这个处理器channelHandler:3.把ServerSocketChannel注册到BoosGroup组里的选择出来的NioEventLoop事件循环对应线程的Selector对象上
if (!registered) 表示channel尚未在eventLoop上注册过 该任务将在channel通道注册后调用ChannelHandler.handlerAdded(…)
ServerBootstrap.init(channel)
思考:1.ChannelPipeline的创建时机?是在创建ServerSocketChannel时创建的2.NioSocketChannel是何时注册到WorkGroup里面的NIOEventLoop里面的Selector上去的?是在客户端有读写事件发生时触发的3.netty是如何实现无锁串行化的?netty读流程io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read拿到管道里的数据转换为bytebuf,然后依次调用管道里的各个处理器去处理bytebuf数据byteBuf = allocHandle.allocate(allocator); pipeline.fireChannelRead(byteBuf);
bossGroup
判断当前 NioEventLoop的state标记是否未启动,并CAS设置为已启动
等同于Nio编程中的 Selector.open()赋值于 unwrappedSelector属性
当服务器启动时是把serverSocketChannel注册到BossGroup对应的NioEventLoop事件循环对应线程的Selector多路复用器里面,当客户端连接时是将SocketChannel注册到workGroup组的事件循环上,两者逻辑是相同的,但是注册的对象是不同的
当前newSocket()是创建ServerSocketChannel对象;等同于Nio编程中的 ServerSocketChannel.open()
fetchFromScheduledTaskQueue()
AbstractUnsafe#register
pipeline.remove(this)
AbstractUnsafe.register0(promise)
pipeline.fireChannelRegistered()
这里触发的是childHandler中追加的处理器
channel初始化,因为通过channel可以获取channelPipeline,往其channelPipeline中加入ServerBootstrapAcceptor处理器
事件到达
当前这个方法比较有意思!!!针对pipeline中的处理器(上下文),从Head开始,触发所有入站类型处理的 channelRegistered(),类似于链式调用是如何找到下一个handler时可以看下
initChannel((C) ctx.channel())
ServerBootstrapAcceptor
死循环
newCtx
事件循环NioEventLoop.run()死循环执行经典的三个操作
unwrappedSelector = selectorTuple.unwrappedSelector
注册NioServerSocketChannel到selector上,注册成功返回selectKey(参照Nio编程)同时,附加了this(NioServerSocketChannel),即attachment属性
extends
这个处理器来源于ServerBootstrap的初始化【在这个方法里init(channel)可以看到往管道里加入了服务引导类接收器处理器ServerBootstrapAcceptor,所以在链式调用中会调用它的channelRead方法】
callHandlerAddedForAllHandlers()
NioMessageUnsafe.read()该类是处理客户端的连接事件的
next().register(channel)
channelFactory是持有NioServerSocketChannel空参构造器的工厂类
startThread()
doReadMessages(readBuf)
config().group().register(channel)
selector.select(timeoutMillis)
AbstractSet<SelectionKey>
processSelectedKeysOptimized()
创建出对象:NioServerSocketChannel#NioServerSocketChannel()
通过反射的形式,设置Selector的selectedKeys属性
doStartThread()
假设有客户端连接会触发SelectionKey.OP_ACCEPT事件
SelectorTuple selectorTuple = openSelector()
创建pipeline,并将其channel属性赋值为当前的NioServerSocketChannel对象
DefaultChannelPipeline#fireChannelRead此时的pipeline是ServerSocketChannel的,当看到fire**的时候代码底层会逐个调用处理器链上的所有处理器进行处理
final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);此时的这个childHandler就是ServerBootstrap里的childHandler属性,对应的是图左红框部分,就是我们通常写的处理器初始化器实例,注意此时加入管道的是处理器初始化器
创建ServerSocketChannel和ChannelPipeline,构建处理器上下文链表,并持有头尾处理器上下文。channel和pipeline是彼此包含的,你中有我,我中有你,因为在创建pipeline时,channel又将自己作为参数传进去了
msg就是组装后的 NioSocketChannel
pipeline = newChannelPipeline()》DefaultChannelPipeline(Channel channel)
unsafe.read()
k.attachment() - NioServerSocketChannel.thisjavaChannel()注册时,附加进去的for循环处理selectkeys里的所有key
selector.select()阻塞获取事件
客户端
其实链表中持有的是ChannelHandlerContext,而ChannelHandlerContext对ChannelHandler进行了包装,之所以包装是为了方便持有Pipeline【因为处理器上下文是pipeline创建的,在创建处理器上下文时,它将pipeline作为了属性,所以从上下文里可以轻松获取pipeline,用处理器上下文包装处理器就将处理器和pipeline建立起了关系】,方便对两者进行操作
if (eventLoop.inEventLoop())判断当前线程即启动bootstrap的线程和从Bossgroup中取的线程是否是同一个线程即是否是在bossgroup的eventLoop中执行的线程,否则调用eventLoop中的线程异步执行注册
前置条件:if (ctx.channel().isRegistered())回调重写方法
NioEventLoop结构设计
CAS标记上下文(ctx) 的handlerState属性为 ADD_COMPLETE(2)
ChannelFactory#newChannel
pipeline的属性赋值
如果是连接事件会走NioMessageUnsafe.read(),如果是读事件会走AbstractNioByteChannel.NioByteUnsafe#read
从任务队列taskQueue中抓取一个任务,安全执行死循环,直到抓取不到任务。
将ServerSocketChannel的感兴趣事件设置为OP_ACCEPT
super(parent)
这里的 childHandler其实是Netty服务端编程时自定义的一个ChannelInitializer类型(重写initChannel()方法)的处理器,如:消息编解码处理器和业务处理器。从这个childGroup属性的名称上,我们就可以看出这个处理器是给子事件循环组(workGroup)使用的。最终,这个处理器会加入到NioSocketChannel的管道中,NioSocketChannel注册在workGroup对应的EventLoop上,关心客户端的读写事件。注意,当前channel还没有完成注册,意味着本次加入到管道中的 childHandler会以回调的形式完成处理器添加,回调处理的时候会将通道初始化器给删除,将通道初始化器里的所有通道加入到管道中span style=\
case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
SocketChannel ch = SocketUtils.accept(javaChannel())
this代表的是NioServerSocketChannel将this和客户端连接服务端到达的SocketChannel组装成一个对象NioSocketChannel细看这个类的构造方法,给SocketChannel注册感兴趣事件是 OP_READ当前类是面向客户端的,专门用于注册客户端的读事件;此时会进行客户端的初始化而NioServerSocketChannel是面向服务端,用于处理客户端连接
childGroup.register(child)这个地方的逻辑其实和ServerSocketChannel注册到BossGroup的逻辑一样,只不过此时是客户端的注册,将NioSocketChannel注册到workGroup的NioEventLoop的Selector上。【这一步很重要,连接了bossGroup和workGroup】注册完会将SocketChannel对应的SocketChannelPipeline中ChannelIntilization中的channelHandler都加入到该管道中,加入完之后将自己删除,另外NioEventLoop上的Selector执行Select方法开始监听读写事件了
NioEventLoop(父类SingleThreadEventLoop)中维护了一个任务队列 tailTasks;当前方法实际上就是 runAllTasksFrom(tailTasks)
DefaultChannelPipeline.addLast(handler)
selectedKeys = selectedKeySet;
pendingHandlerCallbackHead = task;
0 条评论
回复 删除
下一页