netty服务端客户端交互流程源码分析
2021-09-04 19:57:28 15 举报
非常详细
作者其他创作
大纲/内容
SocketChannel
服务端给客户端发数据
@Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException(\"channelFactory\"); } if (this.channelFactory != null) { throw new IllegalStateException(\"channelFactory set already\"); } this.channelFactory = channelFactory; return self(); }
select(wakenUp.getAndSet(false));进入这个方法会看到selector.select(timeoutMillis);这时serversocket还没跟select做绑定呢这个select方法传了参数表示超过timeoutMillis这个值就会退出阻塞继续往下走然后会在run放发的下面看到
// 跟进invokeHandlerAddedIfNeeded()方法最终会到这个callHandlerAddedForAllHandlers() 方法中在这个方法中会看到这个样一句代码 task.execute();会进入这个PendingHandlerAddedTask类中在这个类的execute()方法中会看到 callHandlerAdded0(ctx);这个方法这个ctx是在addLask这个方法中初始化的就是当前对象跟着箭头走就能看到这个对象是什么了然后进入callHandlerAdded0(ctx);这个方法就能看到这样一句代码ctx.callHandlerAdded();然后跟进 就可以看到这样一句代码 handler().handlerAdded(this); 就是从这里调过去的
workerGroup
workerGroup读写事件只要一被注册到这里来之后客户端在发送读写 就会直接访问这里
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
ServerSocketChannel
final ChannelFuture regFuture = initAndRegister();
然后把ServerSocketChannel注册到Selector中或者说跟一个selector进行绑定
SocketChannel接收到客户端请求之后会返回一个SocketChannel
netty封装nio多路复用之后的在nio角度的模型图 bossGroup为2workerGroup 为4
bossGroup
pipeline.fireChannelRead(readBuf.get(i)); 在netty中只要方法开头带fire这个单词说明要调用下一下handler对应的方法了
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException(\"channelClass\"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
当来了一个读事件bossGroup转发的时候是依次给workerGroup的 就是比如workerGroup一共有3个selector 第一次给第一个selector第二次给第二个selector依次类推 到了第四次就会从新开始从第一个selector再次的分配
public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException(\"childHandler\"); } this.childHandler = childHandler; return this; }
children = new EventExecutor[nThreads];
一定要先记住nio多路复用模型图 netty都是根据nio多路复用进行的封装 如果不是很理解nio多路复用模型图会有点难懂netty
nio多路多路复用模型图
gossGroup
ChannelFuture regFuture = config().group().register(channel);
连接事件之后触发读或写等事件
连接事件
在调用到这个方法之前创建了一个selector对象不过这个对象还没调用打开的方法还初始化了线程数量,最终会在MultithreadEventExecutorGroup这个类中的这个方法里初始化netty的线程模型MultithreadEventExecutorGroup
在这个方法中创建了ServerSocketChannel 并且跟selector进行绑定
这是netty对socketchannel的封装之后的结构ServerSocketChannel跟它也是一样的结构 但是在写ServerSocketChannel的时候没写它的结构源码 现在可以根据SocketChannel的结构进行脑补
private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; }这里把ChannelInitializer这个handler删了 说了这么多就是想表达netty是怎么把handler删掉的
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
后面有被netty封装后的nio模型 这里的是最原始的nio模型
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }next()就是从NioEventLoopGroup中初始化的线程组中拿出其中一个线程 也就是拿出一个EventExecutor这个类型来进行做业务处理
在nitAndRegister()方法中执行了init(channel);channel = channelFactory.newChannel(); ChannelFuture regFuture = config().group().register(channel);关键代码
然后执行selector.select()放法
if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } }这样一个代码 当执行了 processSelectedKeys();就说明有读写事件了没有读写事件就会执行runAllTasks()这个方法 这是服务端启动很显然不会有事件 所以就会执行这个runAllTasks()方法 执行了这个方法就会跳到看左边的箭头
无参构造方法 public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
这个方法是调用NioServerSocketChannel的无参构造方法 在无参构造方法中执行了SelectorProvider.provider().openServerSocketChannel这么一句代码然后就返回了ServerSocketChannel这个对象 并且还初始化了一个连接事件 就是把表示连接事件的值给NioServerSocketChannel中的属性了还没跟selector进行绑定在构造方法中还初始化了一个pipeline
channel = channelFactory.newChannel();
// 通过跟进super(parent); 进入到这里 在这里给这个ServerSocketChannel绑定了一个pipeline protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
先进入NioMessageUnsafe实现的 因为跟我一步一步的分析源码现在我们客户端跟服务端才刚刚建立连接 进入之后看到doReadMessages(readBuf);跟 pipeline.fireChannelRead(readBuf.get(i));这两个方法 看左边箭头
然后执行selector.select()放法 只负责连接的事件如果有了读写它会把这个SocketChannel注册到workerGroup中的selector中去
AbstractBootstrap在这个类中 public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException(\"group\"); } if (this.group != null) { throw new IllegalStateException(\"group set already\"); } this.group = group; return self(); }
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });这句话先不要看箭头下面的也先不要看 等后面让你看左边的箭头时再看 跳到这里执行register0(promise);这个方法 然后这时就要进入这个方法中看他的逻辑
客户端给服务端发数据
addTask(task); 这个方法就是把Runnable线程放到task队列中了
跟进register0这个方法之后然后再跟进它里面的 doRegister();这个方法 由AbstractNioChannel这个类实现的pipeline.invokeHandlerAddedIfNeeded();还有这个方法调用这个方法之后会调用下一个handler的handlerAdded方法根据上面逻辑也就是会进入ChannelInitializer这个类中执行它的handlerAdded方法
startThread();
EventLoopGroup bossGroup = new NioEventLoopGroup(3);EventLoopGroup workerGroup = new EventLoopGroup (10);
这里写的是 processSelectedKeys();服务端与客户端交互的源码
它继承了MultithreadEventExecutorGroup这个抽象类这个抽象类继承AbstractEventExecutorGroup这个类
SingleThreadEventExecutor.this.run();这里会从NioEventLoopGroup中拿出一个NioEventLoop然后调用它的run方法
跟进上面的方法进入之后再跟进这个方法 processSelectedKeysOptimized();
到了这以及表明服务端以及启动了开始接收客户端的连接或者说等待事件发生
nio中的ServerSocketChannel在netty中称为channel会封装成一个NioServerSocketChannelsocketChannel会封装成一个NioSocketChannel类型 这个两个类型 NioEventLoopGroup都被它给封装 这里面会实现一个线程 所以一般我们叫它线程池 里面具体有什么socketChannel哪里写了
init(channel);
在NioEventLoop这里面会执行SelectorProvider.provider().openSelector();这么一句代码然后把之前的线程组在这里面进行绑定然后初始化taskQueue = newTaskQueue(this.maxPendingTasks); 这个队列是netty为了哪些业务要执行很长时间准备的 当没有读写事件了会把这个队列中存的东西全部执行一遍目前看来就这三个东西重要 其它根据步骤再看源码
SocketChannel接收到客户端请求之后会返回一个SocketChannel当客户端写数据会给服务端发送
执行ChannelInitializer的handlerAdded方法
然后执行selector.select()放法 只负责连接的事件 这个跟下面一样
跟进initChannel(ctx)i方法之后就可以看到这样一句代码nitChannel((C) ctx.channel());它就会调用看箭头吧 看完箭头回来 pipeline.remove(this);它又执行这样一句代码一直跟进remove(this);方法最终会看到 remove0(ctx);这一句代码它的结构看箭头
跟进之后就可以看到这里 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } 然后跟进read方法如果是OP_ACCEPT这个会进入这个类NioMessageUnsafe如果是OP_READ会进入NioByteUnsafe这个类
然后进入这个类的AbstractUnsafe register方法 根据上面步骤一直跟进就能看到
// 在这创建了nio的ServerSocketChannel对象 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href=\"https://github.com/netty/netty/issues/2308\">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( \"Failed to open a server socket.\
跟进方法最终进入doBind(localAddress);这个方法中
addTask(task); startThread();在execute方法执行了这几个核心代码
如果bossGroup有两个就要分配两个端口
如果是OP_READ的话 会直接调用handler的channelRead方法
收藏
0 条评论
下一页