netty
2021-07-05 18:29:09 0 举报
AI智能生成
netty源码解析
作者其他创作
大纲/内容
eventloop
NioEventLoop创建,入口new NioEventLoopGroup()
创建线程执行器,new ThreadPerTaskExecutor(newDefaultThreadFactory())
线程执行器的作用是每次执行任务都创建一个FastThreadLocalThread,该类是对Thread的封装
使用DefaultThreadFactory工厂方法创建的线程命名规则为nioEventLoop-1-xx
循环创建eventLoop并使用数组保存,默认数量为cpu核数*2,children=for() {new child()} —> new NioEventLoop()
保存线程执行器ThreadPerTaskExecutor
创建一个MpscQueue并保存,newTaskQueue(queueFactory),每个EventLoop对应一个MpscQueue,用于执行普通任务
创建一个Selector并保存,openSelector(),每个EventLoop对应一个Selector
openSelector()方法会对selected keySet进行优化,优化过程是SelectedSelectionKeySet存储selected keySet,
SelectedSelectionKeySet的原理是用数组去替换nio底层的HashSet,对keySet的添加操作时间复杂度变成O(1)。
SelectedSelectionKeySet的原理是用数组去替换nio底层的HashSet,对keySet的添加操作时间复杂度变成O(1)。
创建线程选择器封装eventloop数组,chooserFactory.newChooser(children)
线程选择器的作用是给新连接绑定EventLoop,下面两种算法其实都是对EventLoopGroup的线程数取模,优化过程是通过与运算更迅速地取模
判断EventLoopGroup线程数是否2的幂等,isPowerOfTwo
是,PowerOfTwoEventExecutorChooser,index++&(length - 1)
不是,GenericEventExecutorChooser,abs(index++%length)
NioEventLoop启动
服务端启动,入口AbstractBootstrap.doBind()
channel.eventLoop().execute()
判断当前线程是否EventLoop线程,不是的话就调用startThread()创建线程
把创建的线程和EventLoop绑定,ThreadPerTaskExecutor.execute(thread = Thread.currentThread())
SingleThreadEventExecutor.this.run()启动
客户端启动,入口Bootstrap.connect()
NioEventLoop执行,入口NioEventLoop.run()
select() 检查是否有IO事件
判断是否有到期的定时任务,是的话调用selectNow()立即返回
判断taskQueue是否为空,是的话调用selectNow()立即返回
没有的话就阻塞式select(timeoutMillis),直到有感兴趣的事件触发
事件触发后判断耗时,避免jdk空轮询的bug
processSelectedKeys() 处理IO事件
processSelectedKeysOptimized()遍历优化后的keySet,按照具体的key类型进行处理
accept事件,NioMessageUnsafe.read()
read事件,NioByteUnsafe.read()
runAllTasks() 处理异步任务队列
task的分类和添加,task分为普通任务和定时任务,在不同的队列里面
Mpsc队列,基于Jctools工具,普通任务队列,在new EventLoop()的时候创建,调用EventLoop.execute()方法的时候添加
scheduledTask队列,基于PriorityQueue,在调用schedule()方法的时候添加
task的聚合,SingleThreadEventExecutor.fetchFromScheduledTaskQueue()把定时任务放到普通任务队列里面
task的执行,SingleThreadEventExecutor.runAllTasks()从普通任务队列取出task执行,
每执行完64个任务判断一下是否超时,超时或者没有task可执行后就返回
每执行完64个任务判断一下是否超时,超时或者没有task可执行后就返回
服务端启动
创建服务端Channel,入口bind()
initAndRegister()初始化并注册
newChannel()通过反射来创建服务端channel,最终调到
NioServerSocketChannel的构造函数,进入newSocket()方法
NioServerSocketChannel的构造函数,进入newSocket()方法
newSocket()通过jdk底层来创建jdk channel
AbstractNioChannel(),创建并保存id、unsafe、pipeline等组件,配置阻塞模式为非阻塞,
保存感兴趣事件为accept事件(未注册到selector)
保存感兴趣事件为accept事件(未注册到selector)
NioServerSocketChannelConfig(),配置channel相关的tcp参数配置
初始化服务端Channel,入口init()
设置服务端options和attributes
设置服务端handler
在pipeline中添加ServerBootstrapAcceptor,这是一个特殊的InboundHandler,作用是给初始化新建的客户端channel并绑定eventloop
注册服务端Channel,入口AbstractChannel.reister(channel)
绑定服务端eventloop,this.eventloop = eventloop,绑定eventloop
注册selector上,AbstractChannel.register0()
AbstractNioChannel.doRegister() 把jdk底层的channel注册到一个selector上,
并把netty的channel作为jdk底层channel的一个attachment
并把netty的channel作为jdk底层channel的一个attachment
invokeHandlerAddedIfNeeded() 进行事件的回调
fireChannelRegistered() 把注册成功的信息往pipeline传递
端口绑定和注册事件,入口AbstractUnsafe.bind()
doBind()
javaChannel().bind(),调用jdk底层进行本地端口绑定
pipeline.fireChannelActive()
HeadContext.readIfIsAutoRead(),向selector注册accept事件
处理客户端新连接
检测新连接,入口NioEventLoop.processSelectedKey()
检测出accept事件,调用NioMessageUnsafe.read()
doReadMessages(),创建jdk的channel,并封装成netty的channel
allocHandle,控制创建速率
创建NioSocketChannel,入口doReadMessages()的new NioSocketChannel()
AbstractNioByteChannel()
configureBlocking(false) 设置阻塞模式为非阻塞,
设置保存感兴趣的事件类型为op_read
设置保存感兴趣的事件类型为op_read
创建并保存id、unsafe、pipeline到channel
new NioSocketChannelConfig()
setTcpNoDelay(true),禁止Nagle算法
初始化NioSocketChannel,ServerBootstrapAcceptor.read()
设置childOptions和childAttrs
添加childHandler
注册NioSocketChannel,入口EventLoopGroup.register()
选择eventLoop进行绑定,AbstractChannel.this.eventLoop = eventLoop;
注册selector,此时不关注任何事件,AbstractNioChannel.doRegister()
注册事件,入口AbstractNioChannel.doBeginRead()
向selector注册读事件
pipeline
pipeline初始化
pipeline在创建channel的时候被创建,入口AbstractChannel.newChannelPipeline()
pipeline的每一个节点都是一个ChannelHandlerContext
每个ChannelHandlerContext包含一个ChannelHandler
ChannelHandlerContext支持属性存储、读事件传播,写事件传播等功能
特殊的ChannelHandlerContext:HeadContext和TailContext
TailContext,特殊的InboundHandler,最终捕获没有处理的异常、读事件
HeadContext,特殊的OutboundHandler,使用unsafe处理后面传上来的写事件
添加和删除ChannelHandler
入口DefaultChannelPipeline.addLast()
添加和删除的过程,其实是添加和删除ChannelHandlerContext的过程
判断是否重复添加(判断是否Shared和名字是否重复),checkMultiplicity(handler)
创建节点并添加到链表,newCtx = newContext();addLast0(newCtx)
添加完成回调事件,callHandlerAdded0(),例如ChannelInitializer在添加完成后将自身删除
删除完成回调事件,handlerRemoved()
事件的传播
inbound事件的传播
inboundHandler针对channel的被动触发事件,处理register、active、read等事件。
read()事件的传播,从head往tail的inboundHandler里面传播
ctx.read(),从当前节点开始往后传播
ctx.channel().read(),从head节点开始往后传播
SimpleInboundHandler,可以自动释放byte的内存,不需手动释放
outbound事件的传播
outboundHandler针对用户主动发起的动作,处理bind、connect、write、flush等事件
write()事件的传播,从tail往head的outboundHandler里面传播
ctx.write(),从当前节点开始往前传播
ctx.channel().write(),从tail节点开始往前传播
异常事件的传播
异常的触发链,从head往tail传播,和inbound或outbound类型没关系
异常处理的最佳实践:创建一个ExceptionHandler添加到pipeline的最后进行统一异常处理
bytebuf
Bytebuf的结构
结构变量
readIndex,读指针
writeIndex,写指针
capacity,容量
maxCapacity,可扩展的最大容量
引用计数
默认情况下,当创建完一个 ByteBuf 时,它的引用为1
每次调用 retain()方法, 它的引用就加 1
每次调用 release() 方法,是将引用计数减 1
如果引用为0,表示这个 ByteBuf 没有地方被引用到,需要回收内存。再次访问这个 ByteBuf 对象,将会抛出异常。
在调用浅层复制(slice/duplicate)时,可以通过调用一次 retain() 方法 来增加引用,表示它们对应的底层的内存多了一次引用,引用计数为2,在浅层复制实例用完后,需要调用一次 release() 方法,将引用计数减一,不影响源ByteBuf的内存释放。
api
byte=1字节,short=2字节,medium=3字节,int=4字节
readXXX,writeXXX,会移动指针
getXXX,setXXX,不移动指针
mark()记录指针位置,reset()重置指针位置
retain()增加refCnt次数,release()减少refCnt次数
duplicate()是整个原buffer的引用,slice()是对原buffer的readerIndex到writeIndex区间即readable bytes的引用,
两者都是和原buffer共享内存数据和引用计数refCnt,但是有独立的读写指针
两者都是和原buffer共享内存数据和引用计数refCnt,但是有独立的读写指针
copy()方法,原buffer的完全拷贝,和原buffer完全独立
Bytebuf分类
pooled和unpooled
pooled池化,实现准备好内存池,在内存池里面分配内存,用完可以回收重复使用
unpooled非池化,直接分配内存。
unsafe和非unsafe
unsafe使用java的unsafe对象分配,分配的速度会快很多
非unsafe没有依赖unsafe对象
heap和direct
heap使用堆内存,使用数组分配
direct使用堆外内存,使用nio的DirectBuffer进行分配。
ByteBufAllocator分类
AbstractByteBufAllocator
UnPooledByteBufAllocator
unsafe heap:使用数组array[]的实际地址+偏移量读写对象。
非unsafe heap:使用数组array[]+下标读写对象。
unsafe direct:使用memoryAddress+偏移量读写对象。
非unsafe direct:使用nio的ByteBuffer+下标读写对象。
PooledByteBufAllocator
每个线程有一个局部缓存PoolThreadCache,每个PoolThreadCache有directArena和heapArena两种内存分配器,每种内存分配器的数量和线程数量相同(CPU核数 * 2),意味着每个线程都会被分配一个单独的directArena和heapArena。
内存规格
内存规格名称
Chunk,16MB,是Netty向操作系统申请内存的最小调度单位,意味着每次向操作系统申请内存最小为16mB。
Page,8kb,是给ByteBuf分配内存的最小调度单位,需要申请的内存大小(必定是2的幂次方) >= 8kb时,会先取一个chunk,然后会以page级别分配内存,最后将当前chunk标记为“使用了一个部分”,然后放进对应占用率的chunkList。
SubPage,大小不限制,需要申请的内存大小(必定是2的幂次方)< 8kb时,比如现在需要size=2kb,则会先取一个chunk,然后再取一个page,然后将page分成4份(pageSize/size),每一份为2kb,然后取其中一份给ByteBuf初始化。
内存大小
tiny,size < 512B
small,512B <= size < 8kB
normal,8kB <= size <= 16mB
huge,size > 16mB
PoolArena分配内存的过程
通过Recycle创建PooledByteBuf对象,创建出来的PooledByteBuf对象会进行一个reset操作
为PooledByteBuf请求分配内存时,会先尝试从缓存中取出相应内存,如果缓存没有,则穿透到Arena中分配内存。
ByteBuf回收
非pooled,则会直接销毁目标内存块
pooled,放入当前线程的缓存
零拷贝
系统级别的零拷贝
封装nio的FileChannel,FileChannel的map()和transferTo()方法
封装了底层的mmap和sendfile系统调用,实现系统级别的零拷贝
封装了底层的mmap和sendfile系统调用,实现系统级别的零拷贝
应用级别的零拷贝
使用Direct Buffer进行Socket读写,可以减少一次从堆内内存到直接内存的拷贝
使用CompositeByteBuf获得多个ByteBuf的聚合视图,使用dunplicate()、slice()获得原ByteBuf的视图,减少ByteBuf间的拷贝
收藏
收藏
0 条评论
下一页