Netty源码全流程详细分析#基于4.1.29.Final
2022-04-16 16:42:07 2 举报
Netty源码全流程分析
作者其他创作
大纲/内容
DefaultChannelPipeline#addLas(handler)
public EventExecutor next() { return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)]; }
callHandlerAdded0(ctx);
ServerBootstrap#init(channel)
doReadBytes(byteBuf));
invokeFlush0();
判断当前evenLoop的状态是否未启动,CAS设置成已启动,使doStartThread只会执行一次
处理普通任务时会影响到处理IO事件,通过ioRatio控制io事件所占比例,设置成100会处理完所有的任务再循环获取IO事件,反而不利于处理IO事件
pipeline.fireChannelReadComplete();
EventLoop
SingleThreadEventExecutor.this.run();
ChannelPipeline p = channel.pipeline();
addTask(task);
执行注册
selector.select()阻塞获取事件
TailContext
safeExecute(task);其实就是task.run();
Head
select(wakenUp.getAndSet(false));
写完数据最后都会把OP_WRITE事件给取消掉
Write是出站操作,从TailContext开始到HeadContext结束
unsafe.finishConnect();
pipeline.fireChannelRead(byteBuf);
移除OP_CONNECT事件
HeadContext#flush
pendingHandlerCallbackHead = task
放入到队列中的注册任务通过runAllTask()执行
startThread();
HeadContext
TaskQueue
readInterestOp
创建nThreads数量的NioEventLoop
new DefaultChannelPipeline(this);创建Pipeline并将其channel属性赋值为NioServerSocketChannel
通过一种掩码符操作来判断该handler是否对某个方法感兴趣,后续事件传播会用到
processSelectedKeys();
DefaultEventExecutorChooserFactory.INSTANCE
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
判断channel是否已注册,是则执行初始化
SocketChannel ch = SocketUtils.accept(javaChannel());
unwrappedSelector = selectorTuple.unwrappedSelector;
DefaultChannelPipeline
此时Channel还未注册,需要创建一个待处理的回调任务供后续调用
accept事件
CASS设置handlerState为ADD_COMPLETE(已完成)
addLast0(newCtx);
死循环
AbstractUnsafe#register0(promise)
此时触发pipeline中的Handler的ServerBootstrapAcceptor#channelRead方法
客户端发送数据触发OP_READ事件
this.chooser = chooserFactory.newChooser(this.children);
id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();
decrementPendingOutboundBytes
标识连接的完成
initAndRegister
pipeline事件传播
incompleteWrite(true);
通过掩码计算查找到pipeline中下一个对channelActive感兴趣的handler
pipeline.fireChannelRegistered();
Selector
WorkGrop
NioEventLoop#run()
doWrite(outboundBuffer);
ChannelInitializer#initChannel(ChannelHandlerContext)
读取客户端发来的数据到ByteBuf中
从taskQueue中取一个任务
取下一个EvenLoop调用
反射获取SelectorImpl类的属性Set<SelectionKey> selectedKeys;Set<SelectionKey> publicSelectedKeys并赋值属性类型为Netty优化过的SelectedSelectionKeySet(HashSet优化为数组)
处理IO事件
调用下一个inbound的channelactive方法
pipeline属性赋值
ChannelHandlerMask#mask0
pipeline.invokeHandlerAddedIfNeeded();
这样后续通过NioEventLoop.selectedKeys就能获取到Selector到达的事件
没有定时任务时阻塞1s多一点然后继续执行
Tail
ChannelId
如果taskQueue不为空则执行selectNow()否则返回SelectStrategy.SELECT,虽然队列中已经有注册任务了但是Selector上未注册任何感兴趣事件,所以此次selectNow()没有任何收获
read事件
绑定端口号
使用bossGroup注册NioEventLoopGroup extends MultithreadEventLoopGroup
Runnable task = pollTask();
new PendingHandlerAddedTask(ctx)
incompleteWrite(false);
默认创建CPU核心数*2的线程数
config().group().register(channel)
AbstractChannelHandlerContext#invokeChannelRead
若自己定义实现handler的channelInactive方法处理了事件,后续没有调用fireChannelInactive();将不会继续传播到下一个handler
init(channel)
AbstractNioChannel#doBeginRead
SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
判断当前线程是否是EvenLoop此时是main线程走else
BossGrop
创建未包装过的Selector,等同于Nio中的Selector.open()
初始化完后移除这个特殊的handler
NioEventLoopGroup#newChild(...)
initChannel((C) ctx.channel());
next = findContextOutbound();
处理普通任务
remove(ctx);
获取最近的定时任务剩余时间,若没有定时任务则返回1秒
AbstractChannel#flush0();
PendingHandlerCallback#execute()
SelectionKey.OP_CONNECT
WorkGroup中选一个EvenLoop注册NioSocketChannel
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
若写缓冲区已经满了无法写入了会触发注册一个OP_WRITE事件,这样当写缓存有空间来继续接受数据时,该写事件就会被触发,继续将没写完的数据写出
outboundBuffer.addFlush();
从Pipeline的Head开始依次传播注册事件
//是否是2的次幂if (isPowerOfTwo(executors.length)) { //按位与(&)操作性能更好 return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); }
MultithreadEventLoopGroup#register(channel)
SingleThreadEventLoop#register(channel)
此时的msg就是NioSocketChannel
Channel注册时作为附件传递
runAllTasksFrom(tailTasks);
由于之前已经将interestOps设置成OP_READ
invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
ServerBootstrap
ChannelInitializer
对相关事件进行处理
HeadContext#channelActive
将Channel注册到绑定的evenLoop中的selector上,设置SelectionKey感兴趣事件为0并将该channel作为附件挂载在SelectionKey的attachment上
......
processSelectedKeysOptimized();
依次调用pipeline中inbound的channelRead方法
SelectionKey
处理优化过的SelectedKey
ctx.writeAndFlush
NioServerSocketChannel
public EventExecutor next() { return this.executors[this.idx.getAndIncrement() & this.executors.length - 1]; }
AbstractBootstrap#bind(int)
从定时任务中获取到期的任务合并到普通任务taskQueue中
super
SingleThreadEventExecutor#execute(Runnable)
此时SelectionKey感兴趣事件为OP_READ和OP_WRITE
this是NioServerSocketChannel,构造NioSocketChannel,设置NioSocketChannel感兴趣事件为OP_READ
时间修正+0.5ms
安全执行
next().register(channel);
id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();
readBuf中就是accept到的nioSocketCannel
将serverChannel绑定到网络端口
if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); }
NioSocketChannel#doWrite
NioServerSocketChannel/NioSocketChannel首次注册上会依次执行传播连接事件
初始化单线程执行器
this.executionMask = mask(handlerClass);
调用evenLoop异步执行注册;这里也是第一次触发eventLoop.execute的地方
设置selectionKey的感兴趣事件为channel上的readInterestOp(即NioServerSocketChannel为OP_ACCEPT,SocketChannel为OP_READ)
ServerBootstrapAcceptor
NioByteUnsafe#read()
taskQueue.offer(task);
AbstractBootstrap#doBind()
write 方法并没有将数据写入 Socket 缓冲区,只是将数据写入到 ChannelOutboundBuffer 缓存中,ChannelOutboundBuffer 缓存内部是由单向链表实现的。
将之前添加的msg标记成flushed,即从unflushedEntry添加到flushedEntry(第一个被写到缓冲区的节点)
callHandlerAddedForAllHandlers();
NioMessageUnsafe#read()
将注册任务往evenLoop的taskQueue添加
异步添加一个ServerBootstrapAcceptor,后续用来处理客户端连接
ServerSocketChannel
返回原始Selector和包装过的Selector
移除OP_WRITE事件
provider.openServerSocketChannel();创建ServerSocketChannel等同于Nio中的ServerSocketChannel.open
readIfIsAutoRead();
afterRunningAllTasks();
等同于Nio中ServerSocketChannel.accept()
ctx.setAddComplete();
pipeline.fireChannelActive();
fetchFromScheduledTaskQueue();
next.invokeChannelInactive();
由于NioEvenLoop没有实现注册,调用父类的注册
初始化tailTasks和taskQueue为MpscQueue
客户端
tail.writeAndFlush(msg);
AbstractNioChannel#doRegister();
客户端连接触发服务端的OP_ACCEPT事件
super(parent);
if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); }
unwrappedSelector = provider.openSelector();
ChannelFactory#newChannel
NioEventLoop(...)
((ChannelInboundHandler) handler()).channelInactive(this);
unsafe.read();
pipeline.fireChannelRead(readBuf.get(i));
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
有任务优先处理任务
Pipeline
获取IO事件
runAllTasks();
setOpWrite();
HeadContext#write
ch.configureBlocking(false);将channel设置为非阻塞
1.标记channel为已注册2.pendingHandlerCallbackHead设置为Null3.执行pendingHandlerCallbackHead
NioEventLoopGroup(int nThreads)
clearOpWrite();
ctx.handler().handlerAdded(ctx);
SelectionKey.OP_WRITE
SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;
connect
循环调用Nio底层write(最多默认循环16次)将数据写入Socket缓冲区
通过反射创建NioServerSocketChannel
Unsafe
span style=\"font-size: inherit;\
ch.unsafe().forceFlush();
unsafe#flush();
chooser.next();轮询获取下一个
监听端口
filterOutboundMessage如果 msg 使用的不是 DirectByteBuf,那么它会将 msg 转换成 DirectByteBuf。
channel首次注册时才会调用
往pipeline中添加一个初始化器,此时并未执行initChannel代码
doReadMessages(readBuf);
若Serverbootstrap有配置过父handler则添加到pipeline中
最后执行尾部任务tailTasks
0 条评论
下一页