Netty源码
2023-03-04 22:28:16 3 举报
netty
作者其他创作
大纲/内容
MyNettyServerHandler
channel注册时调用,调用完删除该handler
for循环处理所有的keys
childHandler就是netty服务端初始代码我们自己写的ChannelInitializer
taskQueue.poll()
bossGroup
SingleThreadEventExecutor#eventLoop.execute(new Runnable() { public void run() { AbstractUnsafe.this.register0(promise); }});
调用ServerBootstrapAcceptor.channelRead方法
initChannel
selector.select(timeoutMillis)
NioServerSocketChannel
this.init(channel)
this.constructor = clazz.getConstructor();
CLIENT
将SocketChannel包装为NioSocketChannel
ChannelFactory<? extends C> channelFactory
SocketUtils.accept(this.javaChannel())
当客户端有连接,则会发生OP_ACCEPT事件
pipeline
EventLoopGroupthis.group=parentGroup
initAndRegister
ch.configureBlocking(false)
调用pipeline里每个handler的channelRegisted方法this.pipeline.fireChannelRegistered()
不同线程组里的线程的selector监听处理
taskQueue.offer(task)
调用pipeline里每个handler的channelRead方法pipeline.fireChannelRead(this.readBuf.get(i))
SingleThreadEventLoop#register
ServerBootstrap
group
SingleThreadEventExecutor.this.run()
AbstractChannel#register
SingleThreadEventExecutor#addTask(task)
tail
往serverchannel中注册handler
new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyServerHandler()); }}
将boss、worker两个组作为属性赋值
ServerBootstrapAcceptor
创建通道初始化对象,设置初始化参数this.childHandler=childHandler
AbstractNioChannel#doRegister()
真正执行register0
直接调用所有handler的channelRead方法
Unsafe.read(NioMessageUnsafe)
ServerBootstrapAcceptor .channelRead
processSelectedKey
构造方法执行
head
SingleThreadEventExecutor#runAllTasks
channel
当客户端往服务端发送消息,则sockeChannel则会发生OP_READ事件
初始channel,并将其感兴趣的事件设置为OP_ACCEPT
OP_READ
NioEventLoop#run()
doReadMessages(readBuf)
this.pipeline = this.newChannelPipeline()
processSelectedKeysOptimized
PowerOfTwoEventExecutorChooserEventExecutor next() { return this.executors[this.idx.getAndIncrement() & this.executors.length - 1]; }
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {...}
EventLoopGroup this.childGroup=childGroup
把serverchannel绑定到网络端口9000
task.run()
channel.pipeline()
将连接过来的socketChannel注册到workgroup里的一个线程selector上
第一次会直接进来,执行队列里的任务
if (executor == null) { executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory()); }this.children = new EventExecutor[nThreads]for(int i = 0; i < nThreads; ++i) {this.children[i] = font color=\"#ba68c8\
将channel感兴趣的时间注册为OP_READ
this.processSelectedKeys
readBuf放的是所有连接过来的socketchannel的OP_ACCEPT
DefaultEventExecutorChooserFactorypublic EventExecutorChooser newChooser(EventExecutor[] executors) { return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors)); }
bind(9000)
调用pipeline里每个handler的handlerAdded方法this.pipeline.invokeHandlerAddedIfNeeded()
第一次直接进来
OP_READ | OP_ACCEPT
child.pipeline().addLast(new ChannelHandler[]{this.childHandler})
SingleThreadEventExecutor#Queue<Runnable> taskQueue
doBind
/** * handler的生命周期回调接口调用顺序: * handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete * -> channelInactive -> channelUnRegistered -> handlerRemoved * * handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调; * channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。 * channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。 * channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读; * channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕; * channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。 * channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程; * handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。 */
死循环
ChannelInitializer
childHandler
把task任务放入taskqueue
通过调用chooser的next方法,获取线程池中的某个线程
super(parent)
config().group().register(channel)
当timeoutMillis或有事件过来跳出循环
workerGroup
this.channelFactory.newChannel()
doStartThread
select(boolean oldWakenUp)
具体注册逻辑见ServerSocketChannelthis.childGroup.register(child)
NioEventLoop
此处的逻辑与bossgroup一致
MultithreadEventLoopGroupnext().register(channel)
获取NioServersocketChannel的无参构造方法作为属性赋值
注册chanlnel到selector
new ReflectiveChannelFactory(NioServerSocketChannel.class)
反射生成实例
Unsafe.read(NioByteUnsafe)
register0
protected EventLoop font color=\"#f44336\
死循环监听IO事件
startThread
使用NioServerSocketChannel作为服务器的通道实现this.channelFactory = channelFactory
0 条评论
下一页