Netty线程模型源码分析
2021-07-02 15:15:49 2 举报
此图是我对Netty线程模型源码的理解,供大家学习交流
作者其他创作
大纲/内容
Client
head
Buffer
TailContext
StringDecoder
NioEventLoop
ch.configureBlocking(false)
select(wakenUp.getAndSet(false))
step 3:runAllTasks
ServerSocketChannel
初始化ServerSocketChannel的ChannelPipeline
死循环执行
pipeline.fireChannelRead(byteBuf)
Thread Pool
ChannelInitializer
SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
eventLoop.execute
注册读事件
死循环执行监听IO事件
NioByteUnsafe#read()
NIO
SocketChannel的ChannelPipeline
HeadContext
ChannelHandlerContext
taskQueue.offer(task)
Selector
配置感兴趣的事件为OP_ACCEPT事件
finally中调用,执行taskQueue异步队列里的任务
ServerBootstrap
NioSocketChannel
注册
把任务放入taskQueue队列异步执行
初始化channel过程中会注册ACCEPT事件,配置channel为非阻塞
ChannelPipeline
AbstractNioChannel#doRegister()
DefaultChannelPipeline
child.pipeline().addLast(childHandler)
NioServerSocketChannel#doReadMessages
pipeline.fireChannelRegistered()
initChannel(ctx)
添加一个channelHandler到ChannelPipeline
Channel
childGroup.register(child)
Accept
bossGroup
连接请求
Read/Write
NioServerSocketChannel
ChannelHandler
调用pipeline中handler的handlerAdded方法
读写请求
workerGroup
ChannelInitializer#handlerAdded
将socketChannel注册到workGroup里的一个线程的Selector上
ServerSocketChannel的ChannelPipeline
step 2:processSelectedKeys()
processSelectedKeysOptimized()
SocketChannel
注册ACCEPT事件
ServerBootstrapAcceptor
监听
当timeoutMillis超时或者有事件时break
taskQueue
StringEncoder
pipeline = newChannelPipeline()
8080
配置channel为非阻塞
客户端连接
ChannelFuture regFuture = initAndRegister()
selector.select(timeoutMillis)
SelectionKey:channel在Selector中注册的标识,建立了Selector和Channel的关系,并维护channel事件
tail
读取channel数据交给pipeline处理
生成ChannelHandler的双向链表
ChannelFuture regFuture = config().group().register(channel)
监听端口
返回SocketChannel
pipeline.fireChannelRead(readBuf.get(i))
从bossGroup中获取一个NioEventLoop,处理注册逻辑
将SocketChannel包装为NioSocketChannel
..........
NioEventLoop#run
调用ChannelReadComplete()方法
childHandler就是NettyServer端初始化时我们自定义的ChannelInitializer
unsafe.read()
taskQueue.poll()
channel = channelFactory.newChannel()
配置Channel感兴趣的事件为OP_READ事件
SingleThreadEventExecutor.this.run()
Thread
NettyServerHandler
task
Server
for循环处理key
this.readInterestOp = readInterestOp
step 1:Select
循环处理selectedKeys中的key
触发ACCEPT事件
p.addLast(new ChannelInitializer<Channel>() {})
调用pipeline中handler的channelRegistered方法
触发READ事件
SelectionKey
注册Channel到Selector
AbstractNioMessageChannel.NioMessageUnsafe#read
task.run()
((ChannelInboundHandler) handler()).channelRegistered(this)
pipeline.invokeHandlerAddedIfNeeded()
readBuf中存放所有Accept事件连接过来的SocketChannel
......
ServerBootstrap.ServerBootstrapAcceptor#channelRead
processSelectedKeys()
绑定监听端口
bootstrap.bind(8000)
注册channel到selector上
addTask(task)
startThread()
EventLoopGroup
当客户端建立连接,会触发ACCEPT事件
super(parent)
Selector(多路复用选择器)
AbstractChannel#register0(promise)
serverSocketChannel.accept()
int localRead = doReadMessages(readBuf)
调用socketChannel中pipeline中的所有handler的channelRead方法
绑定端口
通过反射的方式创建NioServerSocketChannel对象,是对ServerSocketChannel的包装
runAllTasks(ioTime * (100 - ioRatio) / ioRatio)
serverSocketChannel绑定监听端口
初始化SocketChannel的ChannelPipeline
执行任务
ChannelPipeline p = channel.pipeline()
SocketChannel ch = SocketUtils.accept(javaChannel())
当客户端往服务端发送数据,会触发READ事件
pipeline.fireChannelReadComplete()
next().register(channel)
init(channel)
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } })
initChannel()
0 条评论
下一页