Netty线程模型源码剖析
2022-03-20 06:00:31 1 举报
Netty线程模型源码剖析
作者其他创作
大纲/内容
并将channel感兴趣的事件设置为OP_READ
selector.select(timeoutMillis)
NioEventGroup
将socketChannel进行包装
SocketUtils.accept(javaChannel());
监听端口
p.addLast(new ChannelInitializer<Channel>()){...}
初始化ServerSocketChannel的ChannelPipline
pipeline.fireChannelRead(byteBuf);
不同线程组里的线程的selector监听处理
Selector
doStartThread()
从bossGroup中获取一个NioEventLoop处理channel的注册逻辑并将其注册到线程的slelctor上
AbstractChannel$AbstractUnsafe#eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });
readBuf里放的是op_accept事件连接过来的所有socketChannel
pipeline = newChannelPipeline()
直接调用socketChannel的pipeline里的所有handdler的channelRead方法
eventLopp.exeute
AbstractNioChannel#doRegister()
taskQueue.offer(task)
并配置channle为非阻塞
TailContext
taskQueue.poll()
OP_READ
......
NettyServerHandler
SocketChannel的ChannelPipline
HeadContext
初始化SocketChannle的pipline
super(parent)
NioServerSocketChannel
unsafe.read();#NioByteUnsafe
javaChannel().register(eventLoop().unwrappedSelector()
MultithreadEventLoopGroup#next().register(channel);
ChannelFuture regFuture = initAndRegister()
主要通过0拷贝技术来实现
当channel注册时会调用,调用完会删除此handler
child.pipeline().addLast(childHandler)
Worker Group
bootStrap.bind()
获取socketChannel(可以看NIO代码做为参考)
StringEncoder
死循环监听io事件
this.readInterestOp = readInterestOp
TaskQueue
pipeline.fireChannelReadComplet()
doReadMessages(readBuf);
把task任务放入TaskQueue中异步执行
通过反射创建NioServerSocketChannel对象,是对ServerSocketChannel的包装
select(wakenUp.getAndSet(false))
unsafe.read();#NioMessageUnsafe
addTask(task)
AbstractChannel$AbstractUnsaf#register0(promise)
ChannelInitializer
#NioEventLoop.run()
processSelectedKeysOptimized()
DefaultChannelPipeline
循环处理selectedKeys中的key
DefaultChannelPipeline#pipeline.invokeHandlerAddedIfNeeded()
pipeline.fireChannelRead(readBuf.get(i));
添加一个channelHandler到ChannelPipline
config().group().register(channel)
initChannel(ctx)
SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
newChannelPipeline()
注册channle到selector(包装了NIO代码,可以参考NIO代码来看)
processSelectedKeys()
运行TaskQueue异步队列里的任务
死循环执行
for循环处理selectionkeys的里的所有key
将socketChannel注册到workerGroup里一个线程的slelector上
startThread()
processSelectedKeysOptimized()
DefaultChannelPipeline()
task.run()
childGroup.register(child)
SingleThreadEventExecutor.this.run()
channelFactory.newChannel()
ServerSocketChannel的ChannelPipline
pipeline.fireChannelRegistered()
task
依次执行:1 DefaultChannelPipeline# invokeHandlerAddedIfNeeded(); callHandlerAddedForAllHandlers(); callHandlerAdded0();2 AbstractChannelHandlerContext#callHandlerAdded();3 Channelinitialier#handlerAdded()
ChannelPipeline p = channel.pipeline()
Boss Group
当客户端往服务端发送数据,会触发OP_READ事件
SingleThreadEventLoop#register(Channel channel);
Channelinitialier#handlerAdded()
#NioServerSocketChannel.doReadMessages(List<Object> buf)
StringDecoder
把serverChannel绑定到到网络端口
#AbstractNioMessageChannel.NioMessageUnsafe.read()
ServerBootstrap
当客户端建立连接会触发OP_ACCEPT事件
当timeoutMillis超时或者有事件发生会break处理
ch.configureBlocking(false)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio)
ServerBootstrap.init(channel)
childHandler就是NettyServer端端初始化时我们自己定义的ChannelInitializer
SocketChannel的注册逻辑和ServerSockerChannel的注册逻辑一样,注册完会调用SocketChannlel的ChannelInitializer把里面的handler全部放入pipline
调用channelReadComplete()方法
设置channel为非阻塞
Client
调用pipeline里每个handler的handlerAdded方法
ServerBootstrapAcceptor
0 条评论
回复 删除
下一页