Netty源码分析
2022-10-20 11:48:22 33 举报
AI智能生成
Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。它提供了一套丰富的API,支持多种传输协议,如TCP、UDP和HTTP等。Netty的核心组件包括Channel、EventLoop、ChannelHandler等,通过这些组件可以构建复杂的网络应用程序。Netty的源码结构清晰,易于阅读和理解,是学习和研究网络编程的优秀实践。
作者其他创作
大纲/内容
new NioServerSocketChannel()
newSocket(PROVIDER) -> ServerSocketChannel
this.readInterestOp = SelectionKey.OP_ACCEPT 保存关心的事件为Accept
创建ServerSocketChannel
添加之后的结构
初始化ServerSocketChannel
AbstractChannel.this.eventLoop = eventLoop; 将分配到的EventLoop保存至chanel内部
将Netty-Channel内部的jdk-Channel注册至该EventLoop绑定的selector上,span class=\"equation-text\" contenteditable=\"false\" data-index=\"0\" data-equation=\"此时\
获取注册后的selectionKey作为Netty-Channel的成员变量
注册selector
绑定完毕,触发channelActive事件HeadContext ctx.fireChannelActive();
触发完毕,调用HeadContext.readIfIsAutoRead();
传播至HeadContext.read()-> unsafe.beginRead();-> doBeginRead() 修改SelectionKey感兴趣的事件为创建时保存的兴趣事件
服务端口绑定
注册主要目的用于将register() -> Netty-Channel注册至EventLoop (表现行为为保存EventLoop引用)register0() -> jdk-Channel注册至selector上 (通过jdk的方式注册)
全部由HeadContext节点处理
服务端启动
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory())
execute -> threadFactory.newThread(command).start()
taskQueue = newTaskQueue(this.maxPendingTasks);
selector = openSelector();
chooser = chooserFactory.newChooser(children);
EventExecutorChooser.next()轮询选择EventLoop
创建
SingleThreadEventExecutor.execute(task)
this.thread为null -> 非nio线程 -> doStartThread()
thread = Thread.currentThread();保存由executor分配的线程(FastThreadLocalThread)
SingleThreadEventExecutor.this.run(),在该线程中正式启动EventLoop
executor.execute()创建FastThreadLocalThread
addTask(task) -> taskQueue.offer(task) 添加此任务由eventloop执行
启动
获取最新要执行的定时任务的deadline作为这次select的deadline
hasTasks()为穿插任务,当出现穿插任务时,跳出这次select
selector.select(timeoutMillis) 进行阻塞式select
当检测到未实际阻塞并且超过selectCnt阈值512
newSelector = openSelector();
Register all channels to the new Selector.
触发空轮训bug -> rebuildSelector();
检测IO事件和任务队列
在默认优化的前提下,SelectorImpl的selectedKeys和publicSelectedKeys已被反射替换为new SelectedSelectionKeySet(),该实现通过数组的方式优化了HashSet
processSelectedKeysOptimized(selectedKeys.flip())
处理IO事件
循环获取scheduledTask.deadlineNanos() <= nanoTime的定时任务即将需要执行的定时任务合并至taskQueue
在deadline(ioRatio默认1:1)之内时循环执行taskQueue中的task
任务执行
2 * cpuAbstractBootstrap.doBind0() -> channel.eventLoop().execute -> channel.bind当execute调用时,当分配到的EventLoop尚未启动时触发线程的启动执行在绑定结束后,会触发pipeline的channelActive事件
实际阻塞的select操作未发生阻塞并超过阈值则重新构建selector
当外部线程调用eventLoop或channel方法时将外部线程执行的任务封装成task丢至EventLoop顺序执行
执行逻辑
NioEventLoop
NioServerSocketChannel.doReadMessages()调用accept方法获取jdk SocketChanne
pipeline.fireChannelRead(readBuf.get(i))allocHandle用来控制accept速率
新连接检测
this.readInterestOp = SelectionKey.OP_READ 保存关心的事件为read
ch.configureBlocking(false);
javaSocket.setTcpNoDelay(true);
NioSocketChannel的创建
Channel 层级关系
ChannelConfig 层级关系
Netty中Channel的分类
服务端channel初始化时触发Acceptor的添加ServerBootstrap.init(Channel channel) -> pipeline.addLast(new ServerBootstrapAcceptor())
检测到新连接时触发NioMessageUnsafe.read() -> pipeline.fireChannelRead(readBuf.get(i));
child.pipeline().addLast(childHandler);
对新连接设置options和attrs
next().register(channel)通过chooser选择一个NioEventLoop进行注册
总结:在boss中的NioEventLoop检测到新连接,注册至worker中的NioEventLoop
childGroup.register(channel)新连接注册至workerGroup
入口:pipeline.fireChannelActive();
HeadContext.channelActive(ChannelHandlerContext ctx)
HeadContext.readIfIsAutoRead()
HeadContext.unsafe.beginRead();
AbstractNioChannel.doBeginRead()selectionKey.interestOps(this.readInterestOp)传播结束回到头结点,此时实际设置感兴趣的事件
NioSocketChannel读事件注册
Acceptor的channelRead逻辑
新连接分配NioEventLoop并注册Selector
select检测到新连接
processSelectedKey 处理jdk-客户端channel此时创建Netty-channel保存感兴趣的事件
触发服务端读事件至ServerBootstrapAcceptor.channelRead调用childGroup.register(child),通过choose分配EventLoopNetty-Channel保存分配的EventLoop并将Channel保存的jdk-Channel注册至selector
注册完毕触发ChannelActive事件由HeadContext触发beginRead,此时将开始监听实际感兴趣的事件
eventloop -> processSelectedKeys();
register() 保存选择出来的eventLoop
register0() jdk-channel实际注册,未监听任何事件
beginRead() 监听感兴趣的事件
注册的两个部分
新连接接入
new AbstractChannel(Channel parent)-> pipeline = newChannelPipeline()-> new DefaultChannelPipeline(Channel channel)
pipeline默认结构
pipeline初始化
ChannelPipeline.addLast(ChannelHandler... handlers)
判断是否重复添加checkMultiplicity(handler);
添加至链表addLast0(newCtx);
回调添加完成事件callHandlerAdded0(newCtx);->ctx.handler().handlerAdded(ctx);ctx.setAddComplete();
添加ChannelHandler
删除ChannelHandler
顺序传播
inBound事件的传播
逆序传播
outBound事件的传播
当前节点顺序传播
异常的传播
ChannelHandler分类
根据class类型
根据传播规律添加
ctx当前节点pipeline头尾传播
问题
ChannelPipeline
ByteBuf结构及重要API
ByteBuf分类
heap通过创建数组[]分配内存
direct通过调用jdk-nio创建直接内存ByteBuffer
UnpooledByteBufAllocator
PoolArena - DirectArena - HeapArenaThread通过PoolThreadLocalCache创建的PoolThreadCache与某个Arean绑定PooledByteBufAllocator每次创建时同时创建2种Arean
结构
每个节点为该种规格的RegionCache,通过内部的queue来存储这种规格的内存
MemoryRegionCache
内存规格
Arenas默认大小为2 * cpu核心数Arena用于开辟一块连续内存
Thread - PoolThreadCache - Allocator 三者关系
PoolThreadCache - MemoryRegionCache 关系 memCache用于缓存一块连续内存
PoolThreadCache结构
Chunk结构以及Page切分
PoolArena结构
计算缓存节点,tiny通过除以16得出节点下标拿到MemoryRegionCache
MemoryRegionCache.queue.poll(); 弹出一个EntryinitBuf(); 将弹出的entry所代表的内存分配给ByteBuf-> buf.init() 完成初始化
entry.recycle(); 将弹出的entry丢回对象池(默认只回收1/8)
命中缓存的分配流程
Chunk通过一个平衡二叉树来保存内存分配情况
subpage级别内存分配
未命中缓存的分配流程
缓存的分配流程
PooledByteBufAllocator
ByteBufAllocator内存分配器
拿到MemoryRegionCache节点,添加至队列
当缓存队列满后加入失败,则标记分配到的连续内存为未使用
通过recycle()回收至Recycler
ByteBuf的回收
heap/direct safe/unsafe pooled/unpooled
Allocator持有Arena数组,Arena用于分配内存通过PoolThreadCache将线程与Arena绑定,默认一个Nio线程持管理一个Arena
huge - 直接分配normal - pagesmall/tiny - subpage
总结
ByteBuf
通过cumulation累加字节
未解析数据则跳出循环解析到数据至out时则循环传播解析到的list后clear
ByteToMessageDecoder
直到可读字节数达到一帧则读取ByteBuf至out
FixedLengthFrameDecoder
以\\或\作为分隔符读取一帧
当发现已经超出所设的最大长度时,则丢弃下一个分隔符前的所有字节
LineBasedFrameDecoder
构建时当发现分割符为LineBase则初始化LineBasedFrameDecoder
逻辑同LineBasedFrameDecoder当有多个分隔符时,每次取最小的帧,即以最近的分隔符为截止点
DelimiterBasedFrameDecoder
frameLength += lengthAdjustment + lengthFieldEndOffset;当frameLength大于最大帧限制时,直到将该帧字节全部丢弃完毕才会退出丢弃模式,逻辑同其余解码器
LengthFieldBasedFrameDecoder
通过一定规则累积ByteBuf,当满足一帧时向后传播
如上
Netty解码
通过pipeline调用时从tail节点传播,否则从当前节点传播,见pipeline传播机制
acceptOutboundMessage(msg)I cast = (I) msg;匹配对象
ReferenceCountUtil.release(cast);释放对象
buf.release();释放内存
msg = filterOutboundMessage(msg);检测msg类型是否支持,将buf变为directBuf
添加至buffer
setUnwritable(invokeLater);-> fireChannelWritabilityChanged(invokeLater);当大于64 * 1024时,设置不可写状态
状态
outboundBuffer.addFlush();当总pending字节小于低水位时则设置为可写状态
in.current()拿到flushedEntry的msg
AbstractNioByteChannel.doWrite(ChannelOutboundBuffer in)
in.remove();
当jdk底层无法写入时,之后可能的某个状态
ByteBuf buf = (ByteBuf) msg;
writeAndFlush()
通过编码规则写入ByteBuf,通过ctx或pipeline传递至HeadContext节点
Netty编码
index = InternalThreadLocalMap.nextVariableIndex();每次调用构造函数都分配唯一的index
每次创建都有唯一ID
slowGet() -> ThreadLocal<InternalThreadLocalMap>通过jdk线程变量存储该Map
fastGet((FastThreadLocalThread) thread);直接拿到FastThread内部成员变量Map
每个Thread持有一个InternalThreadLocalMap,为一个数组
每个FastThreadLocal持有一个index,即可在该Thread内的数组中获取该线程变量
不同线程含有不同数组,即ThreadLocal在不同线程之间是隔离的
index为0是variablesToRemoveIndex,故实际有效下标从1开始
每个Thread维护一个数组
slowGet() - fastGet()
Object v = threadLocalMap.indexedVariable(index);直接根据索引号从该线程所持有的数组中获取value
当获取的值为null时调用 -> initialValue();随后将该值设入该线程所持有的数组中
get()实现
获取map
设置值后调用addToVariablesToRemove
remove时将该index位置设置为UNSET当remove调的值不是UNSET时调用onRemoval(v)removeFromVariablesToRemove同时将0位置的set中的该FastThreadLocal引用移除
set()实现
FastThreadLocal
FastThreadLocal<Stack<T>>每个线程持有一个Stack
maxCapacity = 32kratioMask = 7 即只回收1/8的对象maxDelayedQueues = 2 * cpuavaliable = 32 / 2 = 16k
获取线程变量Stack
boolean WeakOrderQueue.transfer(Stack<?> dst)每次transfer转移一个Link块内的数据
scavengeSome()
stack.pop()
stack.pop()从Stack弹出一个handle
当handle为空时,创建一个handle并调用newObject()创建一个对象与handle绑定
recycler.get()
默认情况只回收1/8的未被回收过的对象
直接放入stack的【DefaultHandle<?>[] elements】中
同线程回收对象pushNow(item);
每一个link包含一个handles,默认大小为16每次分配一个link,即批量分配可回收的handle空槽
每次创建WeakOrderQueue都插入head的头部原始Stack就可以通过单向链表获得外部线程回收的对象
绑定关系
当Link满时申请空间后创建一个linktail.elements[writeIndex] = handle; 在link中存储该handlehandle.stack = null; handle存储于Link,此时已不属于原始Stack
回收对象handle.recycle(this);stack.push(this);
Recycler
Netty性能优化工具类解析
ReadTimeoutException
MqttEncoder
单例模式
PowerOfTowEventExecutorChooser
GenericEventExecutorChooser
DefaultEventExecutorChooserFactory.newChooser(EventExecutor[])
策略模式
WrappedByteBuf及其子类
装饰器模式
ChannelFuture为被观察者addListener添加监听器即观察者
writeAndFlush()Promise为被观察者Future为观察者
观察者模式
Bytebuf.foreach
迭代器模式
责任处理器接口ChannelHandler
责任链ChannelPipeline
上下文ChannelHandlerContext通过ctx next/prev构成双向链表
责任终止机制netty - fireother - return false
Pipeline
责任链模式
Netty设计模式应用
单机调优
耗时任务需要单独的线程池
应用调优
优化
Netty源码分析
0 条评论
下一页