Netty
2023-12-09 22:58:52 0 举报
netty
作者其他创作
大纲/内容
ChannelInitializer
这个ChannelInitializer其实就是我们自己添加的
代表事件回调
创建线程池(线程数为1,阻塞队列为linkedblockqueue)
new EventExecutor[nThreads];
runAllTasks();
NioServerSocketChannel()
ServerBootstrapAcceptor#channelRead()
tail
config().group().register(channel)
processSelectedKey
ch.configureBlocking(false);
超时或者有事件会break
pipeline
doBind0
NioMessageUnsafe#read
addTask(task);
doRegister();
封装为NioSocketChannel
SokcetChannel注册逻辑跟ServerSocketChannel的注册逻辑一致,注册完同样会调用ChanneInitializer的initChannel方法这样就会把我们自己添加的handler全部添加到pipeline中去
pipeline.invokeHandlerAddedIfNeeded();
startThread();
代表关键操作
channel注册调用,调用完删除该handler
代表异步
给EventExecutor数组每个元素初始化newChild
1、获取任务
super(parent);
死循环执行
init(channel)
config().group()获取parentGroup
head
register0(promise);
pipeline = newChannelPipeline();
int selectedKeys = selector.select(timeoutMillis);
NioByteUnsafe#read
initAndRegister
2、执行任务
pipeline.fireChannelRead(readBuf.get(i));
newSocket创建ServerSocketChannel
register逻辑总结:1、从EventLoopGroup拿一个EventLoop执行注册逻辑2、异步执行任务,把channel注册到selector上并执行ChannelInitializer的initChannel逻辑3、死循环监听事件,并处理对应的事件
调用pipeline里每个handle的channelRegistered方法
异步执行
当channel注册时候会调用
offerTask(task)
new NioEventLoop
循环所有创建的NioSocketChannel
next()从parentGroup里拿一个线程来处理channel的注册register()将其注册到自己的selector上
doReadMessages
p.addLast(new ChannelInitializer<Channel>() {})
processSelectedKeysOptimized
nThread如果没传就是核心数*2
添加任务
创建SocketChannelSocketUtils.accept(javaChannel());
pipeline = newChannelPipeline();
注意点:这里解决了NIO的空轮训bug(可能没有事件也会返回,这个时候已经触发了空轮训bug导致后面没有事件也会返回,CPU就会一直空转,导致CPU打满)怎么解决的?selectCnt=0,每次调用selector.select都会+1,当这个selectCnt大于一个阈值(512),就会重建selector(初始化一个新的selector,把老的里面的selectkey全部转移到新的selector上)。当超时或者有事件返回都会重新把selectCnt=1
childGroup.register(child)
阻塞队列
注册channel到selector,但是没有注册对应的事件
accept事件
把任务放到阻塞队列,这个阻塞队列就是创建NioEventLoop时的阻塞队列
ServerBootstrapAcceptor
initChanner(ctx)
往线程池中提交任务任务重执行的主线方法:SingleThreadEventExecutor.this.run();
我们自己写的
new NioEventLoopGroup
创建selector
NioEventLoop#run()
把网络数据放到bytebuffer中并调用fireChannelRead方法传递过去(会调用我们自己handler里面的这个方法)
pipeline.fireChannelRegistered();
child.pipeline().addLast(childHandler);
case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
channelFactory.newChannel()
bind
设置serversokcetchannel为非阻塞
for循环所有selectkey事件
这里是childgroup里面的EventLoop里面的selector进行的监听处理这里这样画只是为了简化,因为逻辑都一样
read事件
ChannelPipeline p = channel.pipeline();
NioEventLoop#processSelectedKeys
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
next().register(channel);
doStartThread();
这里调用的socketsocketchannel里面的firechannelread方法,其实就三个handler
0 条评论
回复 删除
下一页