Netty源码分析
2021-06-19 14:19:16 77 举报
高性能网络通信框架Netty源码流程图
作者其他创作
大纲/内容
init(channel)
Tail
if (!registered) 表示pipeline尚未在eventLoop上注册过 该任务将在通道注册后调用ChannelHandler.handlerAdded(…)
initAndRegister()
ServerBootstrap.init(channel)
bossGroup
判断当前 NioEventLoop的state标记是否未启动,并CAS设置为已启动
等同于Nio编程中的 Selector.open()赋值于 unwrappedSelector属性
DefaultChannelPipeline
1、标记 registered属性为true2、暂存 pendingHandlerCallbackHead属性,然后将其赋值为null3、执行暂存对象pendingHandlerCallbackHead
当前newSocket()是创建ServerSocketChannel对象;等同于Nio编程中的 ServerSocketChannel.open()
afterRunningAllTasks()
addLast0(newCtx)
workGroup
fetchFromScheduledTaskQueue()
pipeline.remove(this)
AbstractUnsafe.register0(promise)
NioByteUnsafe.read()
返回 NioServerSocketChannel创建时赋值的unsafe属性 NioMessageUnsafe类型
NioServerSocketChannel
pipeline.fireChannelRegistered()
读写事件NioSocketChannel父类的内部类
这里触发的是 childHandler中追加的处理器
new NioEventLoop(...)
ServerBootstrapAcceptor.channelRead()
doReadBytes(byteBuf)
等同于Nio编程中的ServerSocketChannel.accept()
ch.configureBlocking(false)
事件到达
当前这个方法比较有意思!!!针对pipeline中的处理器(上下文),从Head开始,触发所有入站类型处理的 channelRegistered(),类似于链式调用
initChannel((C) ctx.channel())
pipeline中添加一个系统级处理器,用于后续回调
new PendingHandlerAddedTask(ctx)
还记得在 init() 方法中向pipeline中尾部追加了一个 ChannelInitializer类型(重写initChannel方法)的handler吗?
ServerBootstrapAcceptor
callHandlerAdded0(ctx)
死循环
注册事件NioServerSocketChannel父类的内部类
newCtx
NioEventLoop.run()
unwrappedSelector = selectorTuple.unwrappedSelector
PendingHandlerAddedTask.execute()
SingleThreadEventExecutor.this.run()
注册channel到selector上(参照Nio编程)同时,附加了this(NioServerSocketChannel),即attachment属性
通过这种设计方式,当Selector上有事件到达后,不需要通过 selector.selectedKeys()方法获取事件,直接操作 NioEventLoop.selectedKeys属性就好
注意:ServerBootstrapAcceptor.handlerAdded() 是个空方法,并不像ChannelInitializer会从管道中移除自身
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet()
extends
这个处理器来源于ServerBootstrap的初始化
SelectionKey.OP_READ | SelectionKey.OP_ACCEPT(其他类型的事件,暂不分析)
p.addLast(new ChannelInitializer<Channel>() { ...})
如果 taskQueue 不为空,执行selectNow();否则,返回 SelectStrategy.SELECT虽然队列中有个(注册)任务,但selector上还没有注册任何兴趣事件,注定本次selectNow()不会有收获
pipeline.fireChannelRead(byteBuf)
返回下一个 NioEventLoop由于NioEventLoop本身没有重写register(),所以调用父类SingleThreadEventLoop方法
将客户端连接注册到子事件循环线程组,负责后续的IO读写
callHandlerAddedForAllHandlers()
NioMessageUnsafe.read()
next().register(channel)
channelFactory是持有NioServerSocketChannel空参构造器的工厂类
runAllTasksFrom(taskQueue)
startThread()
ServerBootstrap
doReadMessages(readBuf)
config().group().register(channel)
创建一个回调任务 task
从pipeline中移除这个特殊类型的handler
将channel设置为非阻塞
selector.select(timeoutMillis)
AbstractSet<SelectionKey>
这就是Nio编程中的 selector.select();只不过调用的是其超时执行的重载方法
processSelectedKeysOptimized()
public NioServerSocketChannel()
HeadContext
通过反射的形式,设置Selector的selectedKeys属性
doStartThread()
taskQueue.offer(task)
绑定9000端口
SelectorTuple selectorTuple = openSelector()
创建pipeline,并将其channel属性赋值为当前的NioServerSocketChannel对象
pipeline.fireChannelRead(readBuf.get(i))
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); }});
processSelectedKeys()
final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);
msg就是组装后的 NioSocketChannel
pipeline = newChannelPipeline()
小总结:pipeline.addLast(ChannelHandler) 方法中主要是做两件事:1、创建一个持有传入handler的上下文对象,然后加入到pipeline的尾部;2、调用传入的ChannelHandler.handlerAdded(...)方法。其实handlerAdded的触发机制主要分两种: 第一种是回调方式,体现在ServerBootstrap.init() 时向pipeline中添加一个内置的处理器,那时NioServerSocketChannel 还没有完成注册(即Pipeline的registered属性,一旦标记为true后,再也不会改变),并没有立即调用handlerAdded()方法,而是注册完成后才发起的回调; 第二种是时时调用(其实也不够准确,如果当前执行线程不是NioEventLoop中线程池中的,那么异步线程调用)。除了ServerBootstrap.init()方法中添加的处理器,其他任何形式的pipeline.addLast(),都是会走时时调用。
unsafe.read()
k.attachment() - NioServerSocketChannel.thisjavaChannel()注册时,附加进去的
真正添加处理器上下文
MultithreadEventLoopGroup.register(channel)
doRegister()
Head
pipeline.fireChannelReadComplete()
runAllTasks()
addTask(Runnable task)
this(newSocket(DEFAULT_SELECTOR_PROVIDER))
if (eventLoop.inEventLoop())判断当前线程是否在eventLoop中执行,否则异步执行注册
前置条件:if (ctx.channel().isRegistered())回调重写方法
NioEventLoop结构设计
将客户端发送的二进制消息读到给定的ByteBuf中,并返回读取的字节数量
AbstractChannelHandlerContext.span style=\"font-size: inherit;\
CAS标记上下文(ctx) 的handlerState属性为 ADD_COMPLETE(2)
channelFactory.newChannel()
赋值于 selectedKeys属性
ChannelInitializer.handlerAdded(ChannelHandlerContext ctx)
pipeline的属性赋值
从任务队列taskQueue中抓取一个任务,安全执行死循环,直到抓取不到任务。
将ServerSocketChannel的感兴趣事件设置为OP_ACCEPT
TailContext
重点分析方法!
super(parent)
这里的 childHandler其实是Netty服务端编程时自定义的一个ChannelInitializer类型(重写initChannel()方法)的处理器,如:消息编解码处理器和业务处理器。从这个属性的名称上,我们就可以看出这个处理器是给子事件循环组(workGroup)使用的。最终,这个处理器会加入到NioSocketChannel的管道中。注意,当前channel还没有完成注册,意味着本次加入到管道中的 childHandler会以回调的形式完成处理器添加pendingHandlerCallbackHead.execute()
case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
handler().handlerAdded(this)
pipeline.invokeHandlerAddedIfNeeded()
创建HandlerContext对象,用handler进行属性赋值,同时持有当前的pipeline
本地创建 selectedKeySet等同于Nio编程中 selector.selectedKeys()中的返回值
SocketChannel ch = SocketUtils.accept(javaChannel())
链式调用Pipeline中所有的入站类型的处理器
NioEventLoop(父类)维护了一个定时任务队列scheduledTaskQueue从定时任务队列中取出所有到期的任务,加入到任务队列taskQueue中;添加失败,则重新放回到定时任务队列中
this代表的是NioServerSocketChannel将this和服务端连接到达的SocketChannel组装成一个对象细看这个类的构造方法,感兴趣事件是 OP_READ当前类是面向客户端的,专门用于处理读写事件;而NioServerSocketChannel是面向服务端,用于处理客户端注册
childGroup.register(child)
config().group() 返回的ServerBootstrap的group属性也就是 bossGroup,继承MultithreadEventLoopGroup抽象类
将注册逻辑加入到任务队列中
bind(9000)
NioEventLoop(父类SingleThreadEventLoop)中维护了一个任务队列 tailTasks;当前方法实际上就是 runAllTasksFrom(tailTasks)
DefaultChannelPipeline.addLast(handler)
疑问:在Nio编程中,注册时传入的是SelectionKey.OP_ACCEPT(数值为16);而Netty源码中注册时,传入的是0,很是不解!?
selectedKeys = selectedKeySet;
pendingHandlerCallbackHead = task;
上下方法之间:针对当前channel的属性赋值 registered = true
0 条评论
下一页