netty主体流程
2024-11-14 01:19:51 0 举报
netty主体源码
作者其他创作
大纲/内容
for循环处理selectedKeys里的所有key
workGroup
当有客户端往服务端发送数据,SocetChannel则会发送OP_READ事件
if (Thread.interrupted()) { selectCnt = 1; break; }
channelFactory.newChannel()
否
p.addLast(new ChannelInitializer<Channel>() {......})
unsafe.read()(NioMessageUnsafe)
从bossGroup里拿一个线程来处理channel的注册,并把该线程注册到线程自己的selector上
调用到#SingleThreadEventLoop
pipeline.fireChannelRead(byteBuf);
重置当前时间
timeoutMillis <= 0则存在需要立即执行的定时任务,若当前selectCnt=0即第一次进行For循环,则进行一次非阻塞选择并结束for循环结束阻塞
查看TaskQueue是否有任务,如果有任务队列纯在,则通过Selector执行非阻塞选择返回就绪的channel数量,如果不存在在,则执行返回-1(SelectStrategy.SELECT)
super(parent);
Nio阻塞选择初始计数值++
从head开始调用SeverChannel的pipeline里的所有InboundHandler
ch.configureBlocking(false);
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
HeadContext
客户端
是
监听端口
newChannelPipeline();
doRegister();
delayNanos(currentTimeNanos);
计算器==0
下一次循环
如果当前任务队列中没有任务
doReadMessages(readBuf)
pipeline
SingleThreadEventExecutor.this.run();
bossGroup
pipeline.invokeHandlerAddedIfNeeded();
NIOEventLoop
注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
startThread();
handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(newLifeCycleInBoundHandler)执行完成后的回调; channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。 channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读; channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕;channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。 channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程;handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。
进行阻塞选择,使用上面计算的超时时间为阻塞选择的超时时间,同时selectCnt计数+1
processSelectedKeys()
记录for循环的开始时间和selectCnt初始次数
计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; }
OP_READ | OP_ACCEPT关注的是读就绪事件
是否for循环超出指定次数
ServerBootstrap
初始化channel,并将channel感兴趣的事件设置为OP_ACCEPT
获取首个定时任务执行时间
结束For循环
offerTask(task)
进行非阻塞选择重置计算器为1
next().register(channel)
将channel感兴趣的事件设置为OP_READ
NioServerSocketChannel()
TailContext
channel.pipeline();
serverSocketChannel pipeline
若此时任务队列出现任务,因为进行当前select的进入条件时因为任务队列不存在任务,而此时 有任务则必然新增加的任务,若任务队列出现新增的任务,则唤醒当前线程为非阻塞,进行非阻塞选择并结束for循环
runAllTasks()
initAndRegister()
运行TaskQueue异步队列里的任务
NIO 原生 API 的 select() BUG 出现,将会导致 CPU 空转,这里会重建 Selector
ChannelInitializer
死循环执行监听IO事件
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });
SocketChannel pipeline
selector.select(timeoutMillis)
SocketUtils.accept(javaChannel())
在channel的pipeline里如下handler:ch.pipeline().addLast(new LifeCycleInBoundHandler()); handler的生命周期回调接口调用顺序: handlerAdded ‐> channelRegistered ‐> channelActive ‐> channelRead ‐> channelReadComplete> channelInactive ‐> channelUnRegistered ‐> handlerRemoved
是否有就绪channel
SocketChannel注册逻辑跟ServerSocketChannel注册逻辑一样,注册完会调用SocketChannel里的ChannelInitializer把里面我们自己写的Handler全部放入pipeline
绑定网络监听端口
不同线程组里的线程的selector监听处理
int selectedKeys = selector.select(timeoutMillis); selectCnt ++;
将ServerChannel绑定到网络端口
若没有定时任务则selectDeadLineNanos - currentTimeNanos结果是1毫秒,则timeoutMillis结果大于0,若存在定时任务则timeoutMillis表示当前定时任务还有多久执行
获取SocketChannel
child.pipeline().addLast(childHandler);
当前线程被中断,直接结束For返回
task
将SocketChannel包装为NioSocketChannel
readBuf里放的是OP_ACCEPT事件连接过来的所有SocketChannel
计算定时任务队列中第一个需要的定时任务的时间
bind(8888)
// 重建 selectorrebuildSelector();
channel注册时调用,调用完会删除该handler
初始化变量
else if (unexpectedSelectorWakeup(selectCnt))
dobind
将连接过来的socketChannel注册到workGroup里的一个线程的selector上
返回定时任务还有多久开始执行,若无定时任务返回1毫秒
initChannel(ctx)
select(wakenUp.getAndSet(false));
直接调用socketChannel的pipeline里的有有handler的channnelRead方法
是否新增任务
配置channel非阻塞
register0(promise);
ReflectiveChannelFactory反射
当channel注册时会调用
NioEventLoop.run()
注册channel到selector
把task线程放入TaskQueue异步执行
计算当前For的超时时间或者定时任务执行剩余时间
调用pipeline里每个handler的channelRegistered方法
ServerBootstrapAcceptor
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
addTask(task);
long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The code exists in an extra method to ensure the method is not too big to inline as this // branch is not very likely to get hit very frequently. selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; }currentTimeNanos = time;
ServerBootstrap.init(channel)
计数器重置为1
ThreadExecutorMap.execute调用传递过来的ThreadPerTaskExecutor并执行该类的excute方法开启一个新线程执行
当前线程被异常中断
初始化变量int selectCnt = 0;long currentTimeNanos = System.nanoTime();
pipeline.fireChannelRead(readBuf.get(i));
selectedKeys阻塞选择发现存在就绪channeloldWakenUp传入的阻塞状态为非阻塞wakenUp.get()当前线程的非阻塞状态hasTasks()出现新增任务hasScheduledTasks()有就绪的定时任务满足其中任何一个情况就结束For选择直接返回
return constructor.newInstance();
当timeoutMillis超时或事件发送会breank处理
当有客户端连接,则会发生OP_ACCEPT事件
开始进行死循环执行
switch'选择
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD)
#SingleThreadEventExecutor.execute
调用pipeline里每个handler的handlerAdder方法
select方法是否超时结束
config().group().register(channel)
执行流程
重置计数器为1 重构选择器
processSelectedKeysOptimized();
超时时间<=0?
time重新获取当前时间,判断当前时间减去循环开始时间是否超出上面计算的select阻塞的超时时间,若条件成立,则上面的NIO阻塞选择是正常的超时而结束的,则重新设置selsectCnt=1继续第二次For循环。若selectCnt计数次数大于默认值(512)则重构selector该逻辑为了解决NIO中的select方法快速空选择导致的问题,因为如果select时超时时间结束的则必然selectCnt会被初始成1,若NIO的select快速返回,则当前值会直接自增,自增过快时即出现长时间没有channel就绪的情况下select轮询导致CPU超高负载的情况。
pipeline.fireChannelRegistered();
task.run()
初始化ServerChannel的pipeline
childHandler就是netty服务端初始代码我们自己写的ChannelInitializer
初始化NioServerSocketChannel(对ServerSocketChannel的包装)
childGroup.register(child)
0 条评论
下一页