netty(4.1.6.Final)
2022-01-12 17:36:04 8 举报
AI智能生成
netty源码学习笔记
作者其他创作
大纲/内容
netty工作原理图
服务器端启动
1. 实例化一个NioEventLoopGroup对象,最终会调用的父类MultithreadEventExecutorGroup构造
该对象中包含一组NioEventLoop,默认个数为cup核数*2
每个NioEventLoop包含一个任务队列,一个nio选择器和线程池(该线程池中只包一个线程,这样就不用处理多线程竞争的问题)
在服务端启动时会从bossGroup中取出一个NioEventLoop并往NioEventLoop的选择器中注册accept事件
然后启动NioEventLoop线程池中的那个线程监听accept事件
EventLoopGroup bossGroup = new NioEventLoopGroup();
该对象中包含一组NioEventLoop,默认个数为cup核数*2
每个NioEventLoop包含一个任务队列,一个nio选择器和线程池(该线程池中只包一个线程,这样就不用处理多线程竞争的问题)
在服务端启动时会从bossGroup中取出一个NioEventLoop并往NioEventLoop的选择器中注册accept事件
然后启动NioEventLoop线程池中的那个线程监听accept事件
EventLoopGroup bossGroup = new NioEventLoopGroup();
1. 如果nThreads小于0,则抛出异常,默认没有指定该值的时候为处理器数量的两倍
2. 如果executor没有指定,则使用默认的线程池ThreadPerTaskExecutor,默认工厂为DefaultThreadFactory
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
3. 实例化一个nThread大小的EventExecutor数组
children = new EventExecutor[nThreads];
children = new EventExecutor[nThreads];
4. 循序调用NioEventLoopGroup#newChild方法创建NioEventLoop对象为children数组赋值
其中args分别为WindowsSelectorProvider,DefaultSelectStrategyFactor,RejectedExecutionHandlers
children[i] = newChild(executor, args);
其中args分别为WindowsSelectorProvider,DefaultSelectStrategyFactor,RejectedExecutionHandlers
children[i] = newChild(executor, args);
1. 调用父类SingleThreadEventLoop构造DEFAULT_MAX_PENDING_TASKS为Integer.MAX_VALUE
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
1. 继续调用父类的SingleThreadEventExecutor的构造
super(parent);
super(parent);
1. 调用父类AbstractScheduledEventExecutor的构造
super(parent);
super(parent);
1. 调用父类AbstractEventExecutor构造设置parent属性为传入的EventExecutorGroup
this.parent = parent;
this.parent = parent;
2. 设置addTaskWakesUp为false
this.addTaskWakesUp = addTaskWakesUp;
this.addTaskWakesUp = addTaskWakesUp;
3. 设置最大等待任务数为Integer.MAX_VALUE
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.maxPendingTasks = Math.max(16, maxPendingTasks);
4. 检查线程池对象是否为空,为空抛出NullPointerException异常
this.executor = ObjectUtil.checkNotNull(executor, "executor");
this.executor = ObjectUtil.checkNotNull(executor, "executor");
5. 实例化任务队列赋值给taskQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
taskQueue = newTaskQueue(this.maxPendingTasks);
1. 调用PlatformDependent.newMpscQueue方法创建任务队列并返回
在SingleThreadEventExecutor#newTashkQueue方法中是以LinkedBlockingQueue作为任务队列
但子类NioEventLoop重写了该方法,最终返回的是MpscChunkedArrayQueue(即多生产者单消费者队列)
关于jctools中的队列后续再研究
return PlatformDependent.newMpscQueue(maxPendingTasks);
在SingleThreadEventExecutor#newTashkQueue方法中是以LinkedBlockingQueue作为任务队列
但子类NioEventLoop重写了该方法,最终返回的是MpscChunkedArrayQueue(即多生产者单消费者队列)
关于jctools中的队列后续再研究
return PlatformDependent.newMpscQueue(maxPendingTasks);
6. 检查拒绝策略是否为空,为空抛出NullPointerException异常
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
2. 和./1.5一样,实例化一个MpscChunkedArrayQueue赋值给tailTasks
taskQueue的优先级高于tailTasks和定时任务,定时任务优先级高于tailTasks
tailTasks = newTaskQueue(maxPendingTasks);
taskQueue的优先级高于tailTasks和定时任务,定时任务优先级高于tailTasks
tailTasks = newTaskQueue(maxPendingTasks);
3. provider = selectorProvider;
4. 这一步主要是通过./3中的SelectorProvider#openSelector()打开一个选择器赋值给selector属性,和nio中一样
selector = openSelector();
selector = openSelector();
5. selectStrategy = strategy;
5. 通过传入的事件池选择工厂创建事件池选择器,选择器中有个next方法用来获取分配的NioEventLoop
chooser = chooserFactory.newChooser(children);
chooser = chooserFactory.newChooser(children);
1. 如果children数量为偶数,则创建PowerOfTowEventExecutorChooser事件池选择器
选择线程池的逻辑为(&运算比%运算要快,所以尽量将线程数量指定为偶数):
executors[idx.getAndIncrement() & executors.length - 1];
选择线程池的逻辑为(&运算比%运算要快,所以尽量将线程数量指定为偶数):
executors[idx.getAndIncrement() & executors.length - 1];
2. 如果children数量为奇数,则创建GenericEventExecutorChooser事件池选择器
选择线程池的逻辑为:
executors[Math.abs(idx.getAndIncrement() % executors.length)];
选择线程池的逻辑为:
executors[Math.abs(idx.getAndIncrement() % executors.length)];
6. 创建一个NioEventLoop关闭监听器terminationListener
7. 为每一个NioEventLoop添加./6中的关闭监听器。
e.terminationFuture().addListener(terminationListener);
e.terminationFuture().addListener(terminationListener);
8. 将所有的NioEventLoop添加到一个 LinkedHashSet属性readonlyChildren 中
2. 和./1中一样,创建一个NioEventLoopGroup对象
在客户端connect服务器端时,会触发服务器端的accept监听,服务器端会创建一个NioSocketChannel对象
然后从workerGroup中取出一个NioEventLoop和该NioSocketChannel对象进行绑定,同一个NioEventLoop中可能会绑定多个NioSocketChannel对象
当NioEventLoop中的选择器有新的read事件到来时会通知相应的NioSocketChannel
EventLoopGroup workerGroup = new NioEventLoopGroup();
在客户端connect服务器端时,会触发服务器端的accept监听,服务器端会创建一个NioSocketChannel对象
然后从workerGroup中取出一个NioEventLoop和该NioSocketChannel对象进行绑定,同一个NioEventLoop中可能会绑定多个NioSocketChannel对象
当NioEventLoop中的选择器有新的read事件到来时会通知相应的NioSocketChannel
EventLoopGroup workerGroup = new NioEventLoopGroup();
3. 实例化一个ServerBootstrap对象
ServerBootstrap b = new ServerBootstrap();
ServerBootstrap b = new ServerBootstrap();
4. 将./1中的bossGroup赋值给ServerBootstrap#group属性,将./2 workerGroup赋值给ServerBootstrap#childGroup属性
b.group(bossGroup, workerGroup)
b.group(bossGroup, workerGroup)
5. 通过传入的class构建一个ReflectiveChannelFactory赋值给ServerBootstrap#channelFactory属性用于后续实例化NioServerSocketChannel
b.channel(NioServerSocketChannel.class)
b.channel(NioServerSocketChannel.class)
6. 调用option方法设置一些TCP参数到一个LinkedHashMap属性options中
b.option(ChannelOption.SO_BACKLOG, 100)
b.option(ChannelOption.SO_BACKLOG, 100)
7. 设置ServerSocketChannel的handler
b.handler(new LoggingHandler(LogLevel.INFO))
b.handler(new LoggingHandler(LogLevel.INFO))
8. 设置SocketChannel 的handler,EchoServerHandler为自定义的一个handler
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
9. 将端口实例化为InetSocketAddress对象传入bind方法进行调用
ChannelFuture f = b.bind(9999).sync();
ChannelFuture f = b.bind(9999).sync();
1. 做一些校验工作,如group,channelFactory,childHandler这些值不能为空
validate();
validate();
2. 调用doBind方法
return doBind(localAddress);
return doBind(localAddress);
1. 初始化并注册ServerSocketChannel,并返回一个DefaultChannelPromise对象
因为在初始化的时候会将一些任务交给一个EventLoop线程异步执行
所以需要返回Future对象进行判断任务执行的情况
final ChannelFuture regFuture = initAndRegister();
因为在初始化的时候会将一些任务交给一个EventLoop线程异步执行
所以需要返回Future对象进行判断任务执行的情况
final ChannelFuture regFuture = initAndRegister();
1. 通过#5中设置的channelFactory创建NioServerSocketChannel实例
会调用NioServerSocketChannel的无参构造
channel = channelFactory.newChannel();
会调用NioServerSocketChannel的无参构造
channel = channelFactory.newChannel();
1. 调用父类AbstractNioMessageChannel的构造方法
super(null, channel, SelectionKey.OP_ACCEPT);
super(null, channel, SelectionKey.OP_ACCEPT);
1. 调用父类AbstractNioChannel的构造
super(parent, ch, readInterestOp);
super(parent, ch, readInterestOp);
1. 调用父类AbstractChannel的构造
super(parent);
super(parent);
1. 设置父channel,此处为null
this.parent = parent;
this.parent = parent;
2. 通过DefaultChannelId工厂类创建channelID
id = newId();
id = newId();
3. 调用AbstractNioMessageChannel#newUnsafe方法实例化一个NioMessageUnsafe对象赋值给unsafe属性
unsafe = newUnsafe();
unsafe = newUnsafe();
4. 调用newChannelPipeline方法实例化DefaultChannelPipeline对象赋值给pipeline
pipeline = newChannelPipeline();
pipeline = newChannelPipeline();
2. 将创建的ServerSocketChannel赋值给ch属性
this.ch = ch;
this.ch = ch;
3. 将事件类型存储在readInterestOp属性,此处是OP_ACCEPT事件
this.readInterestOp = readInterestOp;
this.readInterestOp = readInterestOp;
4. 设置ServerSocketChannel为非阻塞模式
ch.configureBlocking(false);
ch.configureBlocking(false);
2. 实例化NioServerSocketChannelConfig对象赋值给config
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
2. 对NioServerSocketChannel执行一些初始化操作
init(channel);
init(channel);
1. 将#6中设置的选项设置到NioServerSocketChannelConfig中
channel.config().setOptions(options);
channel.config().setOptions(options);
2. 如果有设置属性,将设置的属性循环添加到NioServerSocketChannel中
channel.attr(key).set(e.getValue());
channel.attr(key).set(e.getValue());
3. 获取NioServerSocketChannel中的ChannelPipeline(../1.1.1.1.4有设置)
ChannelPipeline p = channel.pipeline();
ChannelPipeline p = channel.pipeline();
4. 将childGroup,childHandler,childOptions,childAttrs四个变量的取出,供下面addLast使用
5. 调用DefaultChannelPipeline#addLast方法将一个ChannelInitializer对象添加到链表中
关于ChannelInitializer:
ChannelInitializer是继承自ChannelInboundHandlerAdapter的抽象类,类中有定义一个抽象initChannel方法
在执行ChannelInitializer#handlerAdded时会调用initChannel(ChannelHandlerContext ctx)方法,该方法中
会调用抽象方法initChannel,然后在执行完该方法后会将ChannelInitializer从DefaultChannelPipeline链表中移除
关于ChannelInitializer:
ChannelInitializer是继承自ChannelInboundHandlerAdapter的抽象类,类中有定义一个抽象initChannel方法
在执行ChannelInitializer#handlerAdded时会调用initChannel(ChannelHandlerContext ctx)方法,该方法中
会调用抽象方法initChannel,然后在执行完该方法后会将ChannelInitializer从DefaultChannelPipeline链表中移除
1. 对handler做些检查,如是否在没有@Sharable注解的情况下重复注册
checkMultiplicity(handler);
checkMultiplicity(handler);
2. 调用newContext方法将handler包装为一个DefaultChannelHandlerContext对象
newCtx = newContext(group, filterName(name, handler), handler);
newCtx = newContext(group, filterName(name, handler), handler);
3. 调用addLast0方法将newCtx添加到DefaultChannelPipeline链表中
在实例化DefaultChannelPipeline时就初始化好了head和tail
addLast0(newCtx);
在实例化DefaultChannelPipeline时就初始化好了head和tail
addLast0(newCtx);
添加后的链表结构
如果DefaultChannelPipeline#registered为false
(此处满足条件,在#../../3.1.1.1.6.4.3处回调的时候才会设置为true)
(此处满足条件,在#../../3.1.1.1.6.4.3处回调的时候才会设置为true)
4. 设置DefaultChannelHandlerContext对象状态为ADD_PENDING
newCtx.setAddPending();
newCtx.setAddPending();
5. 设置事件回调
callHandlerCallbackLater(newCtx, true);
callHandlerCallbackLater(newCtx, true);
1. 此处added为true,会实例化一个PendingHandlerAddedTask赋值给task,该Task主要是执行callHandlerAdded0(在#9.2.1.3.1.1.1.6.4.3.3处进行回调)
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
2. 如果pendingHandlerCallbackHead属性值为null则将task赋值给pendingHandlerCallbackHead
3. 否则将task作为pendingHandlerCallbackHead的尾结点,PendingHandlerCallback本身是一个单向链表
6. 返回当前DefaultChannelPipeline
return this;
return this;
如果DefaultChannelPipeline#registered为true
会立即调用callHandlerAdded0方法,即立即调用handler#handlerAdded方法
也就是说handlerAdded方法的触发要等NioServerSocketChannel注册到选中器完成后
也就是说handlerAdded方法的触发要等NioServerSocketChannel注册到选中器完成后
3. 调用#1 中创建的bossGroup对象的register方法
ChannelFuture regFuture = config().group().register(channel);
ChannelFuture regFuture = config().group().register(channel);
1. 通过#1.5创建的chooser对象的next方法从事件池数组中
取出第一个NioEventLoop对象调用register方法
return next().register(channel)
取出第一个NioEventLoop对象调用register方法
return next().register(channel)
1. 先实例化一个DefaultChannelPromise对象
然后调用父类SingleThreadEventLoop#register
return register(new DefaultChannelPromise(channel, this));
然后调用父类SingleThreadEventLoop#register
return register(new DefaultChannelPromise(channel, this));
1. 最终会调用#9.2.1.1.1.1.1.3中创建的NioMessageUnsafe#register方法
promise.channel().unsafe().register(this, promise);
promise.channel().unsafe().register(this, promise);
1. eventLoop不能为空,即前面传入进来的this
2. NioServerSocketChannel#registered为true则设置异常到promise中并返回
promise.setFailure(new IllegalStateException("..."));
promise.setFailure(new IllegalStateException("..."));
3. 如果eventLoop不为NioEventLoop类型则设置异常到promise中并返回
promise.setFailure(new IllegalStateException("..."));
promise.setFailure(new IllegalStateException("..."));
4. 将传入的eventLoop赋值给NioServerSocketChannel#eventLoop属性
AbstractChannel.this.eventLoop = eventLoop;
AbstractChannel.this.eventLoop = eventLoop;
5. 如果当前线程就是eventLoop线程池中的那个线程(此处不满足该条件,因为eventLoop中的线程要在./6.3.1中才启动),则由当前线程立即执行register0方法
否则进入./6通过eventLoop#execute执行register0方法
register0(promise);
否则进入./6通过eventLoop#execute执行register0方法
register0(promise);
6. 通过eventLoop.execute执行register0()方法
eventLoop.execute(new Runnable() {
public void run() {
register0(promise);
}
});
eventLoop.execute(new Runnable() {
public void run() {
register0(promise);
}
});
1. 判断当前线程是否为事件池中的那个线程
boolean inEventLoop = inEventLoop();
boolean inEventLoop = inEventLoop();
如果inEventLoop为true(不满足)
2. 则将执行register0方法的任务添加到eventLoop的任务队列中(#1.4.1.5中创建的那个任务队列)
addTask(task);
addTask(task);
如果inEventLoop为false(满足)
3. 启动线程
startThread();
startThread();
如果state为ST_NOT_STARTED状态且修改成ST_STARTED状态成功
(state值由原子类进行维护,有五个值:
ST_NOT_STARTED = 1; //线程未启动,为初始值
ST_STARTED = 2; //线程已启动
ST_SHUTTING_DOWN = 3; //线程停止中
ST_SHUTDOWN = 4; //线程已停止
ST_TERMINATED = 5; //线程已死亡)
(state值由原子类进行维护,有五个值:
ST_NOT_STARTED = 1; //线程未启动,为初始值
ST_STARTED = 2; //线程已启动
ST_SHUTTING_DOWN = 3; //线程停止中
ST_SHUTDOWN = 4; //线程已停止
ST_TERMINATED = 5; //线程已死亡)
1. 该方法会通过eventLoop中的单线程池对象executor的execute方法执行一个Runnable任务
doStartThread();
doStartThread();
1. 将eventLoop线程池中的那个线程保存到eventLoop的thread属性中
inEventLoop方法中就是判断这个thread线程是否等于当前线程
thread = Thread.currentThread();
inEventLoop方法中就是判断这个thread线程是否等于当前线程
thread = Thread.currentThread();
2. 如果interrupted属性值为true,则中断当前线程
3. 执行EventLoop#run方法,该run方法中是一个死循环
会重复的干着右侧1,4,5三件事情
SingleThreadEventExecutor.this.run();
会重复的干着右侧1,4,5三件事情
SingleThreadEventExecutor.this.run();
1. 将selectStrategy.calculateStrategy返回值作为switch的条件执行不同的操作
主要是判断taskQueue或tailTasks中是否有任务
有任务的话将执行selectNow()返回0进入switch的defalut(什么也不干)
没有的话会返回-1
主要是判断taskQueue或tailTasks中是否有任务
有任务的话将执行selectNow()返回0进入switch的defalut(什么也不干)
没有的话会返回-1
1. 值为-2
跳出循环
2. 值为-1
执行select方法,该方法中又是一个循环,重复着右侧的操作
select(wakenUp.getAndSet(false));
select(wakenUp.getAndSet(false));
1. 定时任务执行时间要到了,退出循环。
通过下一个定时任务时间执行时间来计算select的阻塞时间,保证定时任务能及时执行
在没有定时任务的情况下默认为下一个任务将在1S后执行,所以默认的select的阻塞时间为1s
通过下一个定时任务时间执行时间来计算select的阻塞时间,保证定时任务能及时执行
在没有定时任务的情况下默认为下一个任务将在1S后执行,所以默认的select的阻塞时间为1s
2. 如果任务队列中有新的任务进来了,退出循环
3. 阻塞1s中,./1中有计算阻塞时间
int selectedKeys = selector.select(timeoutMillis);
int selectedKeys = selector.select(timeoutMillis);
4. selectCnt值加1,该值用来计算selector阻塞次数
selectCnt ++;
selectCnt ++;
5. 符合右侧中的一个条件,退出循环
1. ./3中的返回值不为0,说明有事件就绪
selectedKeys != 0
selectedKeys != 0
2. oldWakenUp 参数为true
3. 用户主动唤醒
wakenUp.get()
wakenUp.get()
4. 任务队列里面有任务
hasScheduledTasks()
hasScheduledTasks()
5. 第一个定时任务即将要被执行
hasScheduledTasks()
hasScheduledTasks()
6. 线程被中断,退出循序
7. 解决jdk的nio bug,
rebuildSelector();
rebuildSelector();
bug描述
poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP或者POLLERR,eventSet事件集合发生了变化,这就导致Selector会被唤醒,进而导致CPU 100%问题。根本原因就是JDK没有处理好这种情况,比如SelectionKey中就没定义有异常事件的类型。
netty解决
通过判断每次select操作是否至少持续了timeoutMillis秒,有的话重置./4的selectCnt值
如果selectCnt的值超过阀值(默认512),则说明连续多次select少于timeoutMillis秒,可能是出现空轮询了
就会将之前注册到老的selector上的的channel重新转移到新的selector上
如果selectCnt的值超过阀值(默认512),则说明连续多次select少于timeoutMillis秒,可能是出现空轮询了
就会将之前注册到老的selector上的的channel重新转移到新的selector上
3. default
什么也不干
如果this.ioRatio==100,表示执行runAllTasks()没有最大时间限制
2. processSelectedKeys();
3. runAllTasks();
如果this.ioRatio!=100(默认为50)
4. 该方法中会根据selectedKeys的值是否为null判断是否调用processSelectedKeysOptimized
在实例化NioEventLoop的时候是有初始化selectedKeys的,所以会调用processSelectedKeysOptimized
在processSelectedKeysOptimized方法中会迭代 selectedKeys 获取就绪的 IO 事件
然后为每个事件都调用 processSelectedKey 来处理它
processSelectedKeys();
在实例化NioEventLoop的时候是有初始化selectedKeys的,所以会调用processSelectedKeysOptimized
在processSelectedKeysOptimized方法中会迭代 selectedKeys 获取就绪的 IO 事件
然后为每个事件都调用 processSelectedKey 来处理它
processSelectedKeys();
1. 循环调用processSelectedKey方法进行处理就绪事件
各个事件的触发时机请查看"NIO的SelectionKey类型部分"
processSelectedKey(k, (AbstractNioChannel) a);
各个事件的触发时机请查看"NIO的SelectionKey类型部分"
processSelectedKey(k, (AbstractNioChannel) a);
1. 处理OP_CONNECT事件
unsafe.finishConnect();
unsafe.finishConnect();
2. 处理OP_WRITE事件
ch.unsafe().forceFlush();
ch.unsafe().forceFlush();
3. 处理OP_ACCEPT 和 OP_READ事件
不同的对象unsafe对象值是不一样的,accept时是NioMessageUnsafe,read时是NioByteUnsafe
unsafe.read();
不同的对象unsafe对象值是不一样的,accept时是NioMessageUnsafe,read时是NioByteUnsafe
unsafe.read();
OP_ACCEPT
在该方法中会创建NioSocketChannel对象,然后将该对象作为入参调用DefaultChannelPipeline链中的channelRead方法
最终会调用到ServerBootstrapAcceptor#channelRead,进行NioSocketChannel的事件注册操作
最终会调用到ServerBootstrapAcceptor#channelRead,进行NioSocketChannel的事件注册操作
OP_READ
5. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
1. 从scheduledTaskQueue转移定时任务到taskQueue
每次 pollScheduledTask 的时候,只有在当前任务的执行时间已经到了,才会取出来
fetchFromScheduledTaskQueue();
每次 pollScheduledTask 的时候,只有在当前任务的执行时间已经到了,才会取出来
fetchFromScheduledTaskQueue();
2. 取出队列中的第一个Runnable任务
Runnable task = pollTask();
Runnable task = pollTask();
3. 如果没有任务则执行完afterRunningAllTasks方法就return了
afterRunningAllTasks方法中主要是循环执行tailTasks队列中的任务
afterRunningAllTasks();
afterRunningAllTasks方法中主要是循环执行tailTasks队列中的任务
afterRunningAllTasks();
NioEventLoop可以通过executeAfterEventLoopIteration方法向tailTasks中添加收尾任务
比如,你想统计一下一次执行一次任务循环花了多长时间就可以调用此方法
比如,你想统计一下一次执行一次任务循环花了多长时间就可以调用此方法
4. taskQueue中有任务的话会循序调用safeExecute方法执行taskQueue队列中的任务
safeExecute方法中会调用任务的run方法进行执行任务逻辑
每执行64个任务判断一下是不是到传入的最大执行时间了,到了的话就跳出循环
safeExecute(task);
safeExecute方法中会调用任务的run方法进行执行任务逻辑
每执行64个任务判断一下是不是到传入的最大执行时间了,到了的话就跳出循环
safeExecute(task);
5. 和./3中的afterRunningAllTasks方法逻辑一致
afterRunningAllTasks();
afterRunningAllTasks();
4. 和./2中一样,将执行register0方法的任务添加到taskQueue任务队列中
这个任务最终会在./3.1.3.5.4处执行
addTask(task);
这个任务最终会在./3.1.3.5.4处执行
addTask(task);
1. 该方法中主要将ServerSocketChannel注册到selector上,只不过注册的事件值为0,表示对任何事件都不感兴趣
同时在注册的时候会将NioServerSocketChannel对象作为attach参数传了过去供后面的事件处理使用
doRegister();
同时在注册的时候会将NioServerSocketChannel对象作为attach参数传了过去供后面的事件处理使用
doRegister();
2. 设置NioServerSocketChannel#registered为true
registered = true;
registered = true;
3. 调用DefaultChannelPipeline#invokeHandlerAddedIfNeeded
该方法主要是用来在ServerSocketChannel注册到selector上后执行
DefaultChannelPipeline#pendingHandlerCallbackHead回调用的,只会调用一次
pipeline.invokeHandlerAddedIfNeeded();
该方法主要是用来在ServerSocketChannel注册到selector上后执行
DefaultChannelPipeline#pendingHandlerCallbackHead回调用的,只会调用一次
pipeline.invokeHandlerAddedIfNeeded();
1. 将DefaultChannelPipeline#registered设置为true
registered = true;
registered = true;
2. 将单向链表DefaultChannelPipeline#pendingHandlerCallbackHead存在临时变量task
然后设置DefaultChannelPipeline#pendingHandlerCallbackHead为null
this.pendingHandlerCallbackHead = null;
然后设置DefaultChannelPipeline#pendingHandlerCallbackHead为null
this.pendingHandlerCallbackHead = null;
3. 循环调用单向链表节点的execute方法,此处链表中应该只有一个节点
就是PendingHandlerAddedTask,所以会调用PendingHandlerAddedTask#execute
while (task != null) {
task.execute();
task = task.next;
}
就是PendingHandlerAddedTask,所以会调用PendingHandlerAddedTask#execute
while (task != null) {
task.execute();
task = task.next;
}
1. 取出NioServerSocketChannel#eventLoop,在#9.2.1.3.1.1.4有设置
EventExecutor executor = ctx.executor();
EventExecutor executor = ctx.executor();
如果当前线程是executor中的那个单线程(此处满足)
2. 直接执行callHandlerAdded0方法
callHandlerAdded0(ctx);
callHandlerAdded0(ctx);
1. 该方法最终会通过initChannel((C) ctx.channel());方式
调用#9.2.1.2.5中addLast的那个ChannelInitializer#initChannel方法
此处再次贴出那块的代码进行分析下
ctx.handler().handlerAdded(ctx);
调用#9.2.1.2.5中addLast的那个ChannelInitializer#initChannel方法
此处再次贴出那块的代码进行分析下
ctx.handler().handlerAdded(ctx);
1. 从NioServerSocketChannel取出pipeline,即DefaultChannelPipeline
final ChannelPipeline pipeline = ch.pipeline();
final ChannelPipeline pipeline = ch.pipeline();
2. ServerBootstrapConfig中取出设置的handler(即#7中设置的LoggingHandler)
通过pipeline.addLast(handler);添加到DefaultChannelPipeline链表尾结点的前一节点
ChannelHandler handler = config.handler();
通过pipeline.addLast(handler);添加到DefaultChannelPipeline链表尾结点的前一节点
ChannelHandler handler = config.handler();
3. 向eventLoop中添加了一个任务,该任务就是向pipeLine链表中addLast一个ServerBootstrapAcceptor对象
ServerBootstrapAcceptor对象的构造中传入在main方法中为childGroup的设置的一些配置,如自定义hander
在有客户端请求服务器连接时会触发服务器accept事件,最终会执行服务器DefaultChannelPipeline链中inbound类型hander的channelRead方法
就会执行到ServerBootstrapAcceptor#channelRead方法,该方法中会通过childGroup中的线程去执行NioSocketChannel的注册操作
在注册完成后会在HeadContext#channelActive中设置对read感兴趣(BossGroup是在doBind中调用channelActive的,而此处是在注册完后直接调用的,因为此时端口已绑定,isActive()方法返回true,具体查看#9.2.1.3.1.1.1.6.4.6)
ServerBootstrapAcceptor对象的构造中传入在main方法中为childGroup的设置的一些配置,如自定义hander
在有客户端请求服务器连接时会触发服务器accept事件,最终会执行服务器DefaultChannelPipeline链中inbound类型hander的channelRead方法
就会执行到ServerBootstrapAcceptor#channelRead方法,该方法中会通过childGroup中的线程去执行NioSocketChannel的注册操作
在注册完成后会在HeadContext#channelActive中设置对read感兴趣(BossGroup是在doBind中调用channelActive的,而此处是在注册完后直接调用的,因为此时端口已绑定,isActive()方法返回true,具体查看#9.2.1.3.1.1.1.6.4.6)
2. 更新handlerState状态为ADD_COMPLETE
ctx.setAddComplete();
ctx.setAddComplete();
如果当前线程不是executor中的那个单线程(此处不满足)
3. 将执行callHandlerAdded0方法作为任务添加到eventLoop中进行执行
说白了就是只能使用eventLoop中的线程执行任务callHandlerAdded0
callHandlerAdded0(ctx);
说白了就是只能使用eventLoop中的线程执行任务callHandlerAdded0
callHandlerAdded0(ctx);
4. 该方法中主要是执行promise.trySuccess()操作
safeSetSuccess(promise);
safeSetSuccess(promise);
1. cas设置DefaultPromise#result属性值为DefaultPromise#SUCCESS
如果成功后DefaultPromise#waiter>0还会执行notifyAll();唤醒阻塞在该DefaultPromise对象上的锁
setSuccess0(result)
如果成功后DefaultPromise#waiter>0还会执行notifyAll();唤醒阻塞在该DefaultPromise对象上的锁
setSuccess0(result)
2. 通知监听器执行回调方法
notifyListeners();
notifyListeners();
5. 从头结点HeadContext开始,调用DefaultChannelPipeline链中inbound类型handler的channelRegistered方法
pipeline.fireChannelRegistered();
pipeline.fireChannelRegistered();
6. 通过isActive()方法判断判断服务器端口是不是绑定了,在客户端连接服务器端触发ServerBootstrapAcceptor注册NioSocketChannel时服务器端口已经绑定好了,这个时候满足条件,会执行pipeline.fireChannelActive(),fireChannelActive()中最终会设置对读感兴趣事件
5. 如果eventLoop#state值>=4,则移除该task并执行拒绝策略
if (isShutdown() && removeTask(task)) {
reject();
}
if (isShutdown() && removeTask(task)) {
reject();
}
6. 如果同时满足右侧添加执行selector.wakeup();(此处满足这些条件)
addTaskWakesUp值为false
task不是NonWakeupRunnable类型
inEventLoop为false
原子类属性wakenUp成功从false修改为true
2. 最终返回promise
return promise;
return promise;
4. return regFuture
2. 从./1返回的reFuture中取出初始化好的NioServerSocketChannel
final Channel channel = regFuture.channel();
final Channel channel = regFuture.channel();
3. 判断./1 中初始化注册的过程中是否有发生异常,有则直接返回regFuture
如果regFuture.isDone()为true
如果为true说明ServerSocketChannel已经初始化注册完成则立即执行doBind0操作
isDone通过判断DefaultPromise#result属性值判断Future是否完成
在./1.3.1.1.1.6.4.4.1会将该值修改为SUCCESS完成且成功
如果为true说明ServerSocketChannel已经初始化注册完成则立即执行doBind0操作
isDone通过判断DefaultPromise#result属性值判断Future是否完成
在./1.3.1.1.1.6.4.4.1会将该值修改为SUCCESS完成且成功
4. 通过channel创建一个新的DefaultChannelPromise对象
因为后续的bind操作依据是在那个EventLoop线程中以异步任务的形式进行执行
所以通过这个新建的promise在主线程中判断执行情况
ChannelPromise promise = channel.newPromise();
因为后续的bind操作依据是在那个EventLoop线程中以异步任务的形式进行执行
所以通过这个新建的promise在主线程中判断执行情况
ChannelPromise promise = channel.newPromise();
5. 在doBind0方法中主要是提交了一个执行绑定操作的异步任务
doBind0(regFuture, channel, localAddress, promise);
doBind0(regFuture, channel, localAddress, promise);
1. 该方法调用DefaultChannelPipeline#bind,DefaultChannelPipeline#bind方法会从TailContext开始往前面查找
outbound类型的handler,调用bind方法,最终会执行HeadContext#bind(和./1.3.1.1.1.6.4.5有些类似)
此处添加了一个CLOSE_ON_FAILURE监听器用来在promise执行失败的情况下关闭ServerSocketChannel
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
outbound类型的handler,调用bind方法,最终会执行HeadContext#bind(和./1.3.1.1.1.6.4.5有些类似)
此处添加了一个CLOSE_ON_FAILURE监听器用来在promise执行失败的情况下关闭ServerSocketChannel
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
1. 通过NioMessageUnsafe#bind执行绑定操作
unsafe.bind(localAddress, promise);
unsafe.bind(localAddress, promise);
1. 对ServerSocketChannel执行绑定操作
doBind(localAddress);
doBind(localAddress);
2. 此处又提交了一个异步任务,该任务中会调用DefaultChannelPipeline#fireChannelActive方法
该方法中会调用HeadContext#channelActive方法
public void run() {
pipeline.fireChannelActive();
}
该方法中会调用HeadContext#channelActive方法
public void run() {
pipeline.fireChannelActive();
}
1. 从HeadContext开始调用DefaultChannelPipeline链表中inbound类型hander的channelActive方法
ctx.fireChannelActive();
ctx.fireChannelActive();
2. 该方法中会从TailContext开始调用DefaultChannelPipeline链表中outbound类型hander的read方法
最后中会调用到HeadContext#read方法
readIfIsAutoRead();
最后中会调用到HeadContext#read方法
readIfIsAutoRead();
1. 调用AbstractUnsafe#beginRead方法,该方法中将会执行doBeginRead方法,主要是将
ServerSocketChannel的selectionKey值修改为16(在#9.2.1.3.1.1.1.6.4.1注册的值为0),表示对OP_ACCEPT事件感兴趣
unsafe.beginRead();
ServerSocketChannel的selectionKey值修改为16(在#9.2.1.3.1.1.1.6.4.1注册的值为0),表示对OP_ACCEPT事件感兴趣
unsafe.beginRead();
3. 设置DefaultPromise#result为SUCCESS表示执行成功
safeSetSuccess(promise);
safeSetSuccess(promise);
6. 返回新创建的promise,通过
return promise;
return promise;
如果regFuture.isDone()为false
如果为false则往regFuture注册一个监听器
在./1.3.1.1.1.6.4.4.2会回调该监听器执行doBind0操作,监听器中执行的操作基本上和上面类似
如果为false则往regFuture注册一个监听器
在./1.3.1.1.1.6.4.4.2会回调该监听器执行doBind0操作,监听器中执行的操作基本上和上面类似
客户端启动
5. 客户端启动其他步骤和服务器端启动大同小异,此处只分析connetct
下面的方法最终会调用doResolveAndConnect
ChannelFuture f = b.connect("localhost", 9999).sync();
下面的方法最终会调用doResolveAndConnect
ChannelFuture f = b.connect("localhost", 9999).sync();
1. 和服务器端#9.2.1逻辑相似
只不过这里是初始化NioSocketChannel并注册SocketChannel到选择器中
final ChannelFuture regFuture = initAndRegister();
只不过这里是初始化NioSocketChannel并注册SocketChannel到选择器中
final ChannelFuture regFuture = initAndRegister();
2. 取出channle
final Channel channel = regFuture.channel();
final Channel channel = regFuture.channel();
3. 和服务器端一样,future中的初始化注册操作已经完成
则直接执行接下面的操作,否则添加监听器在监听器中执行
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
则直接执行接下面的操作,否则添加监听器在监听器中执行
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
1. 获取在iniAndRegister中为channe中分配的那个eventLoop
final EventLoop eventLoop = channel.eventLoop();
final EventLoop eventLoop = channel.eventLoop();
2. 通过上面的eventLoop创建一个InetSocketAddressResolver对象,并缓存到一个Map对象resolvers中
key为这个eventLoop
AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
key为这个eventLoop
AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
3. 通过上面的resolver判断remoteAddress是不是已经resolve过了,没有的话先resolve
这个resolve的过程是异步的,在这个异步操作执行后才会调用doConnect
doConnect(remoteAddress, localAddress, promise);
这个resolve的过程是异步的,在这个异步操作执行后才会调用doConnect
doConnect(remoteAddress, localAddress, promise);
1. 从connectPromise中取出channel,这个promise是#5.3中通过channel创建的,里面有保存channel的引用
final Channel channel = connectPromise.channel();
final Channel channel = connectPromise.channel();
2. 调用AbstractChannel#connect方法,最终会调用TailContext#connect
channel.connect(remoteAddress, localAddress, connectPromise);
channel.connect(remoteAddress, localAddress, connectPromise);
1. TailContext#connect中会从TailContext开始往前面查找
outbound类型的handler,最终会调用HeadContext#connect
HeadContext#connect中会调用AbstractNioChannel#connect
return tail.connect(remoteAddress, promise);
outbound类型的handler,最终会调用HeadContext#connect
HeadContext#connect中会调用AbstractNioChannel#connect
return tail.connect(remoteAddress, promise);
1. 调用doConnect方法,该方法中会通过SocketChannel发送connect请求
如果connect的结果为false(由于是非阻塞IO,一般会立即返回false)
则修改selectionKey#interestOps为8(之前是0),表示等待SelectionKey.OP_CONNECT事件就绪
在三次握手后会触发OP_CONNECT事件就绪,与此同时服务器端OP_ACCEPT事件也会就绪(#9.2.1.3.1.1.1.6.3.1.3.4.1)
doConnect(remoteAddress, localAddress)
如果connect的结果为false(由于是非阻塞IO,一般会立即返回false)
则修改selectionKey#interestOps为8(之前是0),表示等待SelectionKey.OP_CONNECT事件就绪
在三次握手后会触发OP_CONNECT事件就绪,与此同时服务器端OP_ACCEPT事件也会就绪(#9.2.1.3.1.1.1.6.3.1.3.4.1)
doConnect(remoteAddress, localAddress)
如果上面的connect返回true
2. 立即执行下面的方法
(即使此处connect没有立即返回true,在超时时间内监听到connect事件后也会调用该方法)
fulfillConnectPromise(promise, wasActive)
(即使此处connect没有立即返回true,在超时时间内监听到connect事件后也会调用该方法)
fulfillConnectPromise(promise, wasActive)
1. 设置promise为Success,然后执行注册在promise上的监听器
boolean promiseSet = promise.trySuccess();
boolean promiseSet = promise.trySuccess();
2. 和服务器端#9.2.5.1.1逻辑类似,只不过这里是监听read事件,而服务器端监听的是accept
pipeline().fireChannelActive();
pipeline().fireChannelActive();
1. 执行执行DefaultChannelPipeline链表中的inbound类型handler的channelActive方法
ctx.fireChannelActive();
ctx.fireChannelActive();
2. 调用DefaultChannelPipeline链表中outbound类型hander的read方法
最终会调用到HeadContext#read设置selectionKey#interestOps为1,监听读事件
readIfIsAutoRead();
最终会调用到HeadContext#read设置selectionKey#interestOps为1,监听读事件
readIfIsAutoRead();
如果没有connect返回false
3. 添加一个延时任务进行处理超时的情况(在超时时间内监听到connect事件后会关闭该超时任务,具体可查看AbstractNioChannel#finishConnect的finally代码)
通过还添加了一个监听器用来在promise取消的时候关闭这个延时任务
通过还添加了一个监听器用来在promise取消的时候关闭这个延时任务
3. 在connectPromise中添加一个监听器在connect失败后关闭NioSocketChannel
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
NioSocketChannel读操作分析
1. 取出NioSocketChannelConfig
final ChannelConfig config = config();
final ChannelConfig config = config();
2. 取出NioSocketChannel中的DefaultChannelPipeline
final ChannelPipeline pipeline = pipeline();
final ChannelPipeline pipeline = pipeline();
3. 从NioSocketChannelConfig中取出allocator属性,该属性值为ByteBufAllocator.DEFAULT而ByteBufAllocator.DEFAULT的值为ByteBufUtil.DEFAULT_ALLOCATOR
在没有设置系统属性“io.netty.allocator.type”的情况下只要不是Android环境allocator值为PooledByteBufAllocator对象
final ByteBufAllocator allocator = config.getAllocator();
在没有设置系统属性“io.netty.allocator.type”的情况下只要不是Android环境allocator值为PooledByteBufAllocator对象
final ByteBufAllocator allocator = config.getAllocator();
4. recvBufAllocHandle()方法最终获取的对象为AdaptiveRecvByteBufAllocator#HandleImpl
该对象主要是用来负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
该对象主要是用来负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
在AdaptiveRecvByteBufAllocator中保存着一个长度53的整型数组SIZE_TABLE
里面的值为从16起下一个元素加16一直到496,然后从512起,以一个元素*2一直到整型溢出供53个元素
在AdaptiveRecvByteBufAllocator中还保存着DEFAULT_MINIMUM,DEFAULT_INITIAL,DEFAULT_MAXIMUM三个属性值分别为64,1024,65536
HandleImpl在实例化的时候会为属性minIndex,maxIndex,index,nextReceiveBufferSize赋值,前三个取值就是64,1024,65536在SIZE_TABLE的下标位置
nextReceiveBufferSize取值就是SIZE_TABLE[index]即1024,这个值将作为#7中申请内存时的初始内存
里面的值为从16起下一个元素加16一直到496,然后从512起,以一个元素*2一直到整型溢出供53个元素
在AdaptiveRecvByteBufAllocator中还保存着DEFAULT_MINIMUM,DEFAULT_INITIAL,DEFAULT_MAXIMUM三个属性值分别为64,1024,65536
HandleImpl在实例化的时候会为属性minIndex,maxIndex,index,nextReceiveBufferSize赋值,前三个取值就是64,1024,65536在SIZE_TABLE的下标位置
nextReceiveBufferSize取值就是SIZE_TABLE[index]即1024,这个值将作为#7中申请内存时的初始内存
5. 为allocHandle设置 maxMessagePerRead,totalMessages,totalBytesRead三个属性的值,这三个值是在HandleImpl的父类MaxMessageHandle中定义的
其中maxMessagesPerRead的值是在NioSocketChannelConfig父类DefaultChannelConfig的构造中创建AdaptiveRecvByteBufAllocator对象的时候设置进去的
allocHandle.reset(config);
其中maxMessagesPerRead的值是在NioSocketChannelConfig父类DefaultChannelConfig的构造中创建AdaptiveRecvByteBufAllocator对象的时候设置进去的
allocHandle.reset(config);
6. ByteBuf byteBuf = null;
do while循环读取Socket缓冲区的数据
7. 调用allocHandle#allocHandle方,该方法中通过allocator对象申请内存空间
初始内存空间大小为nextReceiveBufferSize的值(一开始默认为1024)
byteBuf = allocHandle.allocate(allocator);
初始内存空间大小为nextReceiveBufferSize的值(一开始默认为1024)
byteBuf = allocHandle.allocate(allocator);
8. doReadBytes(byteBuf)就是从SocketChannel中读取数据到byteBuf,并返回读取的字节个数
然后将读取的字节个数作为入参调用allocHandle#lastBytesRead,lastBytesRead方法中会设置lastBytesRead为读取字节个数
同时累加到totalBytesRead变量中,如果该变量溢出了,则重置为Integer.MAX_VALUE
allocHandle.lastBytesRead(doReadBytes(byteBuf));
然后将读取的字节个数作为入参调用allocHandle#lastBytesRead,lastBytesRead方法中会设置lastBytesRead为读取字节个数
同时累加到totalBytesRead变量中,如果该变量溢出了,则重置为Integer.MAX_VALUE
allocHandle.lastBytesRead(doReadBytes(byteBuf));
9. 如果lastBytesRead值小于等于0,则执行完右侧操作后跳出循环
allocHandle.incMessagesRead(1);
allocHandle.incMessagesRead(1);
byteBuf#release释放申请的内存
设置byteBuf为null帮助GC回收
如果lastBytesRead值小于0则标识close变量为true
说明请求方可能关闭了,在#14中进行进一步处理
说明请求方可能关闭了,在#14中进行进一步处理
10. 累加totalMessages变量
allocHandle.incMessagesRead(1);
allocHandle.incMessagesRead(1);
11. 调用DefaultChannelPipeline链中的handler的channelRead方法
pipeline.fireChannelRead(byteBuf);
pipeline.fireChannelRead(byteBuf);
12. 这一步主要是重新调整内存分配策略,比如说上面的while循环超过了 maxMessagePerRead然后跳出循环了
那么此处就会将HandleImpl中的index将会后移(这样nextReceiveBufferSize值也就变大了)
那么下一次触发read事件重新进入while读取剩下的那些数据时,#7处申请的内存大小就会调整成一个比1024更大的值
allocHandle.readComplete();
那么此处就会将HandleImpl中的index将会后移(这样nextReceiveBufferSize值也就变大了)
那么下一次触发read事件重新进入while读取剩下的那些数据时,#7处申请的内存大小就会调整成一个比1024更大的值
allocHandle.readComplete();
13. 调用DefaultChannelPipeline链中的handler的channelReadComplete方法
当请求的传输的数据比较大时上面的while循环可能会执行多次,也就是说#11处的channelRead方法可能会被多次调用
这个时候可以通过在channelRead方法中将请求数据进行缓存,在channelReadComplete方法再进行处理
另一种比较好的方式就是继承ByteToMessageDecoder重写decode方法
因为在ByteToMessageDecoder#channelRead中子类decode中没有读取掉ByteBuf中的数据的话会被缓存下来
pipeline.fireChannelReadComplete();
当请求的传输的数据比较大时上面的while循环可能会执行多次,也就是说#11处的channelRead方法可能会被多次调用
这个时候可以通过在channelRead方法中将请求数据进行缓存,在channelReadComplete方法再进行处理
另一种比较好的方式就是继承ByteToMessageDecoder重写decode方法
因为在ByteToMessageDecoder#channelRead中子类decode中没有读取掉ByteBuf中的数据的话会被缓存下来
pipeline.fireChannelReadComplete();
14. 如果上面#9中读取的数据小于0,会执行closeOnRead方法,该方法中会判断SocketChannel是不是打开的
是打开的且没有设置ChannelOption.ALLOW_HALF_CLOSURE为true就执行关闭操作
if (close) {
closeOnRead(pipeline);
}
是打开的且没有设置ChannelOption.ALLOW_HALF_CLOSURE为true就执行关闭操作
if (close) {
closeOnRead(pipeline);
}
NioSocketChannel写操作分析
1. 该方法会调用DefaultChannelPipeline链中上一个outbound类型的hander的write方法
最终会执行到HeadContext#write方法,该方法中最终调用的AbstractUnsafe#write
ChannelHandlerContext#write(msg);
最终会执行到HeadContext#write方法,该方法中最终调用的AbstractUnsafe#write
ChannelHandlerContext#write(msg);
1. 取出AbstractUnsafe#outboundBuffer属性,该属性是在创建NioSocketChannel时
实例化NioSocketChannelUnsafe对象时创建的,值为ChannelOutboundBuffer
ChannelOutboundBuffer是一个单向链表,链表中的节点是ChannelOutboundBuffer#Entry对象
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer
实例化NioSocketChannelUnsafe对象时创建的,值为ChannelOutboundBuffer
ChannelOutboundBuffer是一个单向链表,链表中的节点是ChannelOutboundBuffer#Entry对象
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer
2. 如果传入的msg不是ByteBuf或FileRegion类型会抛出UnsupportedOperationException异常
如果是ByteBuf类型不是非堆内存会转化成非堆内存
msg = filterOutboundMessage(msg);
如果是ByteBuf类型不是非堆内存会转化成非堆内存
msg = filterOutboundMessage(msg);
3. 计算msg的字节数赋值给size属性,如果赋值后的size小于0则修改为0
size = pipeline.estimatorHandle().size(msg);
size = pipeline.estimatorHandle().size(msg);
4. msg添加到outboundBuffer单向链表中
outboundBuffer.addMessage(msg, size, promise);
outboundBuffer.addMessage(msg, size, promise);
1. 将msg, size, promise构建成Entry,这个newInstance中使用了Recycler类,使用到了对象池复用技术
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
2. 将entry添加到链表的尾节点
3. ChannelOutboundBuffer中有个AtomicLongFieldUpdater类型的属性totalPendingSize用来存储写入到ChannelOutboundBuffer中的字节总数
如果总字节数超过默认的64KB,则设置通过CAS修改unwritable标志位的值为1(表示通道不可写了默认为0),CAS修改成功后会执行
DefaultChannelPipeline链中的channelWritabilityChanged方法,所以可以在channelWritabilityChanged方法中通过ctx.channel().isWritable()
获取unwritable的值控制数据的写入
incrementPendingOutboundBytes(size, false);
如果总字节数超过默认的64KB,则设置通过CAS修改unwritable标志位的值为1(表示通道不可写了默认为0),CAS修改成功后会执行
DefaultChannelPipeline链中的channelWritabilityChanged方法,所以可以在channelWritabilityChanged方法中通过ctx.channel().isWritable()
获取unwritable的值控制数据的写入
incrementPendingOutboundBytes(size, false);
2. ./1中只是将msg写入到单向链表对象ChannelOutboundBuffer中,这一步才是写入到Socket发送出去
和./1一样,最终会进入AbstractUnsafe类,只不过这里是调用AbstractUnsafe#flush方法
ChannelHandlerContext#flush();
和./1一样,最终会进入AbstractUnsafe类,只不过这里是调用AbstractUnsafe#flush方法
ChannelHandlerContext#flush();
1. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
2. 首先拿到outboundBuffer链表中未刷新的头结点unflushedEntry 赋值给 flushedEntry
然后循环这些节点尝试设置节点的promise#result属性为UNCANCELLABLE(表示不能执行promise取消了)
如果设置失败说明已经调用promise取消方法了,这时需要将节点取消掉,并将outboundBuffer中totalPendingSize值减去该节点的字节数
如果减去后的totalPendingSize值小于默认最小总字节数,也会触发channelWritabilityChanged方法调用
outboundBuffer.addFlush();
然后循环这些节点尝试设置节点的promise#result属性为UNCANCELLABLE(表示不能执行promise取消了)
如果设置失败说明已经调用promise取消方法了,这时需要将节点取消掉,并将outboundBuffer中totalPendingSize值减去该节点的字节数
如果减去后的totalPendingSize值小于默认最小总字节数,也会触发channelWritabilityChanged方法调用
outboundBuffer.addFlush();
3. 执行刷新操作
flush0();
flush0();
1. 如果已经注册了OP_WRITE事件,就直接返回了(一般不会自己去注册OP_WRITE事件,Socket绝大部分情况下是可以写的,因此注册此事件的作用就不大了)
if (isFlushPending()) {
return;
}
if (isFlushPending()) {
return;
}
2. 调用父类AbstractUnsafe的flush0方法,该方法中会将outboundBuffer中缓存的数据通过通过socket传输给对端
super.flush0();
super.flush0();
0 条评论
下一页