Netty核心源码流程
2024-01-22 21:46:19 1 举报
Netty启动流程,Selector事件监听,线程模型核心源码详解
作者其他创作
大纲/内容
pipeline = newChannelPipeline();
创建客服端Group
连接客服端监听的端口
selectedKeys != 0有事件发生
startThread(); doStartThread();
channel = channelFactory.newChannel();
客服端
processSelectedKeys();处理selectedKeys事件
for (;;) 死循环监听等待事件发生
readBufNIOSocketChannel
业务Handler
p.addLast(new ChannelInitializer<Channel>(){...}
bootstrap.bind(inetPort)绑定监听端口Netty服务端核心源码
MultithreadEventLoopGroupnext().register(channel);
设置WorkGroup
NioServerSocketChannel构造方法
int selectedKeys = selector.select(timeoutMillis);事件监听,当有事件发生或者等待超时执行下面的方法
selector
ChannelPipeline p = channel.pipeline();
init(channel);
doRegister();
super(parent);
判断是事件类型
TailContext
callHandlerAddedForAllHandlers();
new NioEventLoop()
pipeline.invokeHandlerAddedIfNeeded();
BossGroup
super()NioEventLoopGroup();默认线程数为cpu*2
ChannelOption设置TCP参数Netty提供灵活的TCP参数配置能力
NioServerSocketChannel.classNetty提供的NIO包装类后面会通过反射实例化
addTask(task)
设置BossGroup
constructor.newInstance();
服务端(ServerSocketChannel)的pipeline结构
trueEventLoop未初始化
调用服务channel pipeline里的Handlerpipeline.fireChannelRead(readBuf.get(i));
bootstrap.bind(9000)
拿到pipeline 的头,调用管道中的所有HandlerPendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) { task.execute(); task = task.next; }
ChannelInitializer
register(channel)
OP_ACCEPT连接事件
NioEventLoopGroup bossGroup监听Selector上的连接事件
addTask()异步执行
config().group()获取最开始设置的BossGroup线程组
注意:此时线程是workerGroup中的线程。监听的事件也是workerGroup中某一个NIoEventLoop上的Selector事件OP_READ读事件
放入TaskQueue的任务pipeline.addLast(Handler);
newChild()
new NioSocketChannel
开启线程异步执行TaskQueue中的任务
childHandlerinitChannel(SocketChannel ch){ch.pipeline().add(Handler)}客服端channel 的管道,其中保存着业务Handler,执行自定义或者Netty提供的业务代码
创建服务端Group
eventLoop.execute()netty封装的线程,并不是开启线程任务,执行这个方法还有业务逻辑执行
将连接过来的SocketChannel注册到workerGroup一个线程的Selector上childGroup.register(child)
WorkerGroup
执行服务端pipeline的HandlerServerBootstrapAcceptor的channelRead方法
child.pipeline().addLast(childHandler);
移除ChannelInitializer
HeadContext
读取数据到byteBufdoReadBytes(byteBuf)
注册连接事件添加ServerBootstrapAcceptor
new DefaultChannelPipeline(Channel)
这里使用的是直接内存Netty中零拷贝的核心byteBuf = allocHandle.allocate(allocator);
NioSocketChannel
workerGroup中的线程选择逻辑,轮询方式选择executors[idx.getAndIncrement() & executors.length - 1];
NioSocketChannel.classNetty提供的NIO包装类后面会通过反射实例化
taskQueue
反射创建NioServerSocketChannel.
serverSocketChannel.accept();
Bootstrap
run()
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
读事件的注册和连接事件的注册逻辑一样都是放入TaskQueue异步进行注册完将自定义的Handler写入SocketChannel的pipeline
创建EventLoop
runAllTasks()Runnable task = pollTask();
创建config,放入serversocketchannel和ServerSocket后面注册监听事件和绑定端口从config中获取
nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads
next()
设置Group
返回NIO的SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());
再开启一个线程异步执行下面的方法executor.execute
doBind(localAddress);
创建服务端pipeline
移除ChannelInitializerpipeline.remove(this);
NioEventLoop
NioEventLoopGroup workerGroup监听Selector上的读写事件
获取BossGroup中的NioEventLoop
fireChannelRead()
线程选择
依次调用pipeline中SocketChannel中的Handler上的channelRead方法pipeline.fireChannelRead(byteBuf);
ch.eventLoop().execute()
创建serverSocketChannel
ServerSocketChannel的pipeline结构
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
new
taskQueue = newTaskQueue(this.maxPendingTasks);
ServerBootstrap
group
有客服端连接注册读事件时添加业务Handler
selector = selectorTuple.selector;
taskQueue.offer(task);将任务放入创建BossGroup是初始化的任务队列中
注册Accept/OP_READ事件到Selector
socketChannel.connect(remoteAddress);
依次调用pipeline中SocketChannel中的Handler上的ChannelReadComplete方法pipeline.fireChannelReadComplete();
doReadMessages
newChannelPipeline()
if (!inEventLoop)
register0(promise);
执行ChannelInitializer中的initChannel方法
Handler执行
new NioEventLoopGroup();
NioMessageUnsafe. unsafe.read();
. . .
newSocket(DEFAULT_SELECTOR_PROVIDER)provider.openServerSocketChannel();
设置serversockerchannerthis.ch = ch;设置连接监听事件this.readInterestOp = readInterestOp;设置非阻塞ch.configureBlocking(false);
服务端
select(wakenUp.getAndSet(false));
ServerBootstrapAcceptor
for (;;){ safeExecute(task); task = pollTask(); if(task == null){ brek; }}执行TaskQueue中的任务
SocketChannel的pipeline结构
HandlerinitChannel(SocketChannel ch){ch.pipeline().add(Handler)}添加业务Handler,客服端在接收或者发送数据是会依次调用这些Handler
parent是 NioServerSocketChannelthis.parent = parent;pipeline = newChannelPipeline();
pipeline
NioEventLoop的run方法SingleThreadEventExecutor.this.run();
NioByteChannel. unsafe.read();
将自定义Initializer放入pipeline后面注册会真正放入Handler
super(parent);设置读事件监听值this.readInterestOp = readInterestOp;设置非阻塞ch.configureBlocking(false);将SocketChannel封装当前类属性中this.ch = ch;
add()
initAndRegister()
与服务端类似initAndRegister();
pipeline = pipeline();
config().group().register(channel);
往taskQueue添加的任务
根据pipeline执行操作不同的管道
0 条评论
回复 删除
下一页