Netty源码
2025-03-18 15:49:29 0 举报
Netty源码
作者其他创作
大纲/内容
head
调用DefaultChannelPipeline的addLast方法创建一个ChannelInitializer
调用doBind0方法将上一步得到的NioServerSocketChannel绑定到指定端口
next
建立连接通道,并封装为NioSocketChannel
tail
NioEventLoop
ServerBootstrap
调用init方法,对上一步实例化得到的NioServerSocketChannel进行初始化
调用addTask方法将注册任务添加到taskQueue中异步执行,添加失败则抛出RejectedExecutionException异常
TailContext
EventLoopGroup workerGroup = new NioEventLoopGroup(10)
b style=\
监听端口,并对SelectionKey.OP_ACCEPT事件感兴趣
prev
调用newSocke方法创建原生的ServerSocketChannel
对SelectionKey.OP_READ事件感兴趣
调用initAndRegister方法进行初始化和注册
调用channelFactory.newChannel()方法,获取之前保存的NioServerSocketChannel无参构造函数,通过反射进行实例化
NioSocketChannel
调用DefaultChannelPipeline的addLast方法将 workerGroup 的EventLoopGroup、NioServerSocketChannel和编码时添加的ChannelInitializer管道初始化程序封装为ServerBootstrapAcceptor对象,并将之前创建的ChannelInitializer对象删除
执行DefaultChannelPipeline定制程序
selector
调用NioServerSocketChannel的doReadMessages方法
EventLoopGroup
DefaultChannelPipeline
Channel channel = NioSocketChannelAbstractChannelHandlerContext head = HeadContextAbstractChannelHandlerContext tail = TailContextChannelFuture succeededFuture = SucceededChannelFutureVoidChannelPromise voidPromise = VoidChannelPromise
EventLoopGroup connectGroup = new NioEventLoopGroup(1)
EventExecutor[] children = new EventExecutor[1]EventExecutorChooser chooser = PowerOfTwoEventExecutorChooser
ServerBootstrapAcceptor
调用pipeline.invokeHandlerAddedIfNeeded方法
调用safeExecute方法安全的运行任务
SelectStrategy selectStrategy = DefaultSelectStrategySelector selector = SelectedSelectionKeySetSelector
Thread thread = FastThreadLocalThreadQueue<Runnable> tailTasks = MpscUnboundedArrayQueueQueue<Runnable> taskQueue = MpscUnboundedArrayQueuefont color=\"#000000\" style=\"\
执行eventLoop.execute(() -> register0(promise))
如果是SelectionKey.OP_READ或SelectionKey.OP_ACCEPT事件,则调用unsafe.read方法处理
SelectionKey.OP_ACCEPT事件会调用NioMessageUnsafe的read方法
选择一个NioEventLoop并注册到其中的多路复用器上
NioSocketChannel和NioServerSocketChannel的DefaultChannelPipeline逻辑一样
获取和客户端建立连接时创建的NioSocketChannel,调用child.pipeline().addLast(childHandler)方法将我们编码时创建的ChannelInitializer程序添加到链表的尾部
最后调用AbstractUnsafe的register方法,将DefaultChannelPromise对象和当前NioEventLoop对象传递过去
NioServerSocketChannel
HeadContext
调用newChannelPipeline()方法创建DefaultChannelPipeline对象
与使用 NIO 一样,调用register方法将NioServerSocketChannel或NioSocketChannel注册到NioEventLoop的Selector多路复用器上
调用chooser.next()方法选择一个NioEventLoop\b,调用它的register方法进行注册
将数据在管道中通过责任链模式进行流式处理
SelectionKey.OP_READ事件会调用NioByteUnsafe的read方法
ChannelInitializer
获取所有OP_ACCEPT事件建立并封装的NioSocketChannel对象
调用DefaultChannelPipeline的addLast方法将负责业务处理的NettyServerHandler对象添加到管道中,并将之前创建的ServerBootstrapAcceptor对象删除
connectGroup
将之前创建的FastThreadLocalThread赋值给NioEventLoop的thread属性
EventExecutor[] children = new EventExecutor[10]EventExecutorChooser chooser = GenericEventExecutorChooser
执行startThread方法,CAS将状态标记从未开始更新为开始
NettyServerHandler
连接事件
最后会调用ServerBootstrapAcceptor的channelRead方法
调用AbstractChannelHandlerContext.invokeChannelRead方法从head节点开始调用所有的ChannelInboundHandler
执行doStartThread方法,调用ThreadPerTaskExecutor的execute方法,再调用DefaultThreadFactory的newThread方法创建FastThreadLocalThread对象,调用start方法启动线程
EventLoopGroup group
用原生的ServerSocketChannel初始化NioServerSocketChannel
将启动通道监听的方法添加到taskQueue中
Thread thread = FastThreadLocalThreadQueue<Runnable> tailTasks = MpscUnboundedArrayQueueQueue<Runnable> taskQueue = MpscUnboundedArrayQueuefont color=\"#ff0000\
设置端口调用bind方法
EventLoopGroup childGroup
workerGroup
注册完成,开始对ChannelPipeline做定制化逻辑
接收和发送的数据会在多路复用器中调用NioSocketChannel的Unsafe组件,将数据在其中的DefaultChannelPipeline中进行流式处理
调用runAllTasks方法,运行之前提交的任务
和使用原生 NIO 一样调用SocketUtils.accept(javaChannel())方法获取和客户端连接的原生SocketChannel对象,并将其封装为NioSocketChannel对象
run方法
遍历所有的事件,调用processSelectedKey方法处理事件
将NioSocketChannel注册到 workerGroup 中其中一个NioEventLoop对应的Selector多路复用器中
调用processSelectedKeys方法处理事件,最后调用的是processSelectedKeysOptimized方法
使用NioServerSocketChannel调用config().group().register(channel)方法进行注册
调用runAllTasksFrom或pollTask方法获取taskQueue队列中的任务
调用childGroup.register方法将NioSocketChannel注册到workerGroup的其中一个NioEventLoop的selector上,由其专有的thread线程对象处理后面的读写事件
......
读写事件
将ServerSocketChannel和NioEventLoop封装为DefaultChannelPromise对象调用register方法
随后的读写事件都将被这个多路复用器监听到,并交由其所在的NioEventLoop绑定的thread线程来处理
Channel channel = NioServerSocketChannelAbstractChannelHandlerContext head = HeadContextAbstractChannelHandlerContext tail = TailContextChannelFuture succeededFuture = SucceededChannelFutureVoidChannelPromise voidPromise = VoidChannelPromise
由类图可知调用的是SingleThreadEventLoop的register方法
死循环执行,和 NIO 一样调用原生的selector.select()方法,在超时时间内获取事件
获取NioSocketChannel的DefaultChannelPipeline对象,调用fireChannelRead方法,在管道中流式处理接收到的数据
调用ch.configureBlocking(false)方法设置为非阻塞,并对SelectionKey.OP_READ事件感兴趣
每个NioEventLoop中的thread属性值是在这里创建的
获取NioServerSocketChannel的DefaultChannelPipeline对象,调用fireChannelRead方法
将端口封装为原生的InetSocketAddress对象调用doBind方法
每个NioEventLoop中的thread属性是在这里赋值的
注册逻辑,调用doRegister方法,最后调用AbstractNioChannel的doRegister方法
异步执行SingleThreadEventExecutor.this.run(),调用NioEventLoop的run方法监听通道
获取 workerGroup 的EventLoopGroup
将NioServerSocketChannel注册到NioEventLoopGroup的NioEventLoop中,NioEventLoop中有一个绑定的处理线程对象,并且有一个Selector多路复用器,其实就是和使用 NIO 一样将SocketChannel注册到Selector多路复用器中
调用channel.pipeline()方法获取NioServerSocketChannel的DefaultChannelPipeline
调用ch.configureBlocking(false)方法设置为非阻塞,并对SelectionKey.OP_ACCEPT事件感兴趣
连接事件会被多路复用器监听到
0 条评论
下一页