Netty框架
2022-01-10 17:01:15 339 举报
AI智能生成
Netty知识总结
作者其他创作
大纲/内容
事件传播
InBound
事件从head传播到tail
AbstractNioByteChannel.read()
DefaultChannelPipeline.fireChannelRead()
head
HeadContext
ChannelHandlerContext
head.invokeChannelRead()
ChannelInboundHandler.channelRead()
ctx.fireChannelRead()
fireChannelRead()
next
ChannelHandlerContext
next.invokeChannelRead()
OutBound
从当前Handler传播到head
ChannelHandlerContext.write()
next
ChannelHandlerContext
next.invokeWrite()
invokeWrite0
ChannelOutboundHandler.write()
ChannelHandlerContext
ctx.write()
write()
从tail传播到head
Channel.write()
ChannelPipeline.write()
ChannelPipeline.write()
TailContext
ChannelHandlerContext
tail.write()
内存管理
数据结构
PoolArea
PoolChunk
order-0 block
8K
order-1 block
16 K
order-2 block
32 K
...
order-11 block
16M
一大块连续的内存
16M
ByteBuf
参考
Netty中ByteBuf内存泄露及释放解析
创建
Netty的接收和发送ByteBuf采用的DirectBuffers,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝
如果使用传统的堆内存(heap buffers)进行socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,
然后才写入socket中。相比堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
然后才写入socket中。相比堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
池化
先申请一块大内存池,在内存池中分配空间,对于这种应用级别的内存二次分配,
就需要手动对池化的ByteBuf进行释放,否则就有可能出现内存泄露的问题。
就需要手动对池化的ByteBuf进行释放,否则就有可能出现内存泄露的问题。
abstract
PooledByteBuf
PooledByteBuf
PooledHeapByteBuf
PooledDirectByteBuf
PooledByteBufAllocator
ChannelHandlerContext.alloc()
非池化
每次I/O读写都会创建一个新ByteBuf,频繁进行大块内存的分配和回收对性能有一定影响,
非池化的ByteBuf可以通过JVM GC自动回收,
也推荐手动回收UnpooledDirectByteBuf等使用堆外内存的ByteBuf
非池化的ByteBuf可以通过JVM GC自动回收,
也推荐手动回收UnpooledDirectByteBuf等使用堆外内存的ByteBuf
UnpooledHeapByteBuf
Java堆内存
推荐业务数据的内存分配使用
UnpooledDirectByteBuf
UnpooledByteBufAllocator
Unpooled
Unpooled.directBuffer()
Unpooled.buffer()
Unpooled.copiedBuffer()
类图
常用方法
order()
字节序
高位字节序:高位字节在前,低位字节在后(内存地址低位在前,高位地址在后)。
低位字节序:低位字节在前,高位字节在后(内存地址低位在前,高位地址在后)。
netty中默认字节序是大端字节序,即字节高位在前,低位在后,符合人类的书写习惯。
大端字节序
readInt()
writeInt()
小端字节序
readIntLE()
writeLE()
discardReadBytes()
把已读的区域数据给丢弃掉,其实就是把这部分区域给回收了,
把读写索引都往前移,可写区域就大了
把读写索引都往前移,可写区域就大了
clear()
这个并不是清除数据,只是重置读写索引到0,可写区域又变大了
复制与共享
copy()
返回ByteBuf的可读字节的拷贝。
修改返回的ByteBuf内容与当前ByteBuf完全不会相互影响。
此方法不会修改当前ByteBuf的readerIndex或writerIndex
修改返回的ByteBuf内容与当前ByteBuf完全不会相互影响。
此方法不会修改当前ByteBuf的readerIndex或writerIndex
完整拷贝,与原buffer是两份数据
duplicate()
返回ByteBuf可读字节的一部分。
修改返回的ByteBuf或当前ByteBuf会影响彼此的内容,
同时它们维护单独的索引和标记,此方法不会修改当前ByteBuf的readerIndex或writerIndex
另请注意,此方法不会调用{@link #retain()},因此不会增加引用计数。
修改返回的ByteBuf或当前ByteBuf会影响彼此的内容,
同时它们维护单独的索引和标记,此方法不会修改当前ByteBuf的readerIndex或writerIndex
另请注意,此方法不会调用{@link #retain()},因此不会增加引用计数。
浅拷贝,与原buffer是共享数据的
slice()
返回共享当前ByteBuf信息的新ByteBuf,他们使用独立的readIndex writeIndex markIndex
修改返回的ByteBuf或当前ByteBuf会影响彼此的内容,同时它们维护单独的索引和标记,
此方法不会修改当前ByteBuf的readerIndex或writerIndex,
另请注意,此方法不会调用{@link #retain()},因此不会增加引用计数
修改返回的ByteBuf或当前ByteBuf会影响彼此的内容,同时它们维护单独的索引和标记,
此方法不会修改当前ByteBuf的readerIndex或writerIndex,
另请注意,此方法不会调用{@link #retain()},因此不会增加引用计数
浅拷贝,与原buffer是共享数据的
引用计数相关
ReferenceCounted
ReferenceCounted
release()
引用计数减少1
retain()
引用计数增加1
refCnt()
获取引用计数
释放时机
释放原则
谁最后访问ByteBuf,谁最后负责释放
发送组件将ByteBuf传递给接收组件,发送组件一般不负责释放,由接收组件释放
如果一个组件除了接收处理ByteBUf,而不做其他操作(比如再传给其他组件),这个组件负责释放ByteBuf。
在 ChannelHandler负责链中,如何释放
如果ChannelHandler中,只有处理ByteBuf的操作,不会调ctx.fireChannelRead(buf)把
ByteBuf传递下去,那就要在这个ChannelHandler中释放ByteBuf。
ByteBuf传递下去,那就要在这个ChannelHandler中释放ByteBuf。
如果ChannelHandler中,会调ctx.fireChannelRead(buf)把ByteBuf传递给下一个ChannelHandler,
那在当前ChannelHandler中不需要释放ByteBuf,由最后一个使用该ByteBuf的ChannelHandler释放。
那在当前ChannelHandler中不需要释放ByteBuf,由最后一个使用该ByteBuf的ChannelHandler释放。
如果处理的ByteBuf是由decode()等会增加计数器的操作生成的,不再传递时,ByteBuf也要释放。
如果不确定要不要释放,或者简化释放的过程,可以调用ReferenceCountUtil.release(ByteBuf)函数。
也可以把ChannelHandler都承继自SimpleChannelInboundHandler虚类,该类会
在channelRead函数中调用ReferenceCountUtil.release(msg)来帮助释放ByteBuf
在channelRead函数中调用ReferenceCountUtil.release(msg)来帮助释放ByteBuf
请求 ByteBuf
自动释放
请求ByteBuf,调用fireChannelRead()传播,由TailContext释放。
Handler继承SimpleChannelInboundHandler可在ChannelRead中自动释放
手动释放
如果中途没有使用fireChannelRead传递下去也要自己释放
通过Channel或ChannelHandlerContext创建的但是没有传递下去的ByteBuf也要手动释放
响应 ByteBuf
响应ByteBuf都是由应用程序产生的,由Netty负责释放。
内存泄露监测
ResourceLeakDetector
检测级别
DISABLE
完全禁用内存泄露检测,不推荐
SIMPLE(默认)
抽样1%的ByteBuf,提示是否有内存泄露
ADVANCED
抽样1%的ByteBuf,提示哪里产生了内存泄露
PARANOID
对每一个ByteBu进行检测,提示哪里产生了内存泄露
参数设置
java -Dio.netty.leakDetectionLevel=ADVANCED
工具类
ByteBufUtil
threadLocalDirectBuffer()
ReferenceCountUtil
safeRelease(ByteBuf)
release(ByteBuf)
重点话题
消息资源管理
Netty如何管理资源
业务逻辑如何处理消息
特定用例特定传输的各种因素和场景
NIO
非阻塞传输,基于java.nio
Epoll
非阻塞传输,基于JNI的epoll,只支持linux,性能优于NIO。linux-2.5.44(2002)+
OIO
阻塞传输,基于java.net
Local
本地传输,在VM内部通过管道通讯
Embedded
测试使用
发送消息方式
ChannelHandlerContext.write()
ChannelPipeline.write()
Channel.write()
Channel关闭方式
Channel.close()
ChannelPipeline.close()
TailContext.close()
nextContextOutbound.close()
ChannelHandlerContext.close()
nextContextOutbound.close()
入站接收缓冲区
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 65536))
.option(ChannelOption.SO_RCVBUF, 1024*4)
.option(ChannelOption.RCVBUF_ALLOCATOR,new FixedRecvByteBufAllocator(4096))
//FixedRecvByteBufAllocator也可以换为AdaptiveRecvByteBufAllocator
.option(ChannelOption.RCVBUF_ALLOCATOR,new FixedRecvByteBufAllocator(4096))
//FixedRecvByteBufAllocator也可以换为AdaptiveRecvByteBufAllocator
出站写缓冲区
ChannelOutboundBuffer
write 操作
将需要写的 ByteBuff 存储到 ChannelOutboundBuffer 中
flush 操作
从 ChannelOutboundBuffer 中将需要发送的数据读出来,并通过 Channel 发送出去。
写操作配置
高水位
write操作后,等待发送消息链表增加后,长度大于高水位线,channel被设置为不可写(unwritable)
ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
默认: 64 * 1024 个消息
如果在write()时候,没有做判断 channel.isWritable() 的,就跟没设置一样!!!
Channel 可写状态变化通知回调方法
ChannelInboundHandler.channelWritabilityChanged()
低水位
flush操作时前,等待发送消息链表减少后,长度小于低水位线,channel被设置为可写(writable)
ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
默认: 32 * 1024 个消息
maxWriteSize
默认: 4M
maxGlobalWriteSize
默认: 400M
maxWriteDelay
默认: 4s
TCP粘包/拆包
参考
Netty 中的粘包和拆包
TCP是以流动的方式传输数据,传输的最小单位为一个报文段(segment)
出现原因
Socket 缓冲区与滑动窗口
Socket 的内核发送缓冲区
(SO_SNDBUF )
(SO_SNDBUF )
Socket 的内核接收缓冲区
(SO_RCVBUF)
(SO_RCVBUF)
滑动窗口
MTU
最大传输单元
Maxitum Transmission Unit
Maxitum Transmission Unit
链路层对一次可以发送的最大数据的限制
操作系统MTU配置
路由器MTU配置
MSS
最大分段大小
Maxitum Segment Size
Maxitum Segment Size
TCP 报文中 data 部分的最大长度,是传输层对一次可以发送的最大数据的限制。
MSS = MTU(1500) -IP Header(20 or 40)-TCP Header(20)
Nagle算法
Nagle 算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块
Nagle 算法的规则
流量整形
实现了控制读取速度、判断发送数据需要的时间、统计周期内读写的字节
ChannelTrafficShapingHandler
单个Channel
GlobalTrafficShapingHandler
所有Channel
GlobalChannelTrafficShapingHandler
所有Channel
TrafficCounter
计算读取和发送的等待时间以及监控读取发送的字节数
启动类
ServerBootstrap
EventLoopGroup
NioEventLoopGroup
shutdownGracefully()
Future<?>
ServerChannel
NioServerSocketChannel
member
ChannelPipeline
ChannelHandler
abstract
ChannelInitializer<SocketChannel>
ChannelInitializer<SocketChannel>
initChannel()
bind()
ChannelFuture
EventLoop
控制流、多线程处理、并发
EventLoopGroup
NioEventLoopGroup
EpollEventLoopGroup
OioEventLoopGroup
EventExecutorGroup
ChannelPipeline
Future
ChannelFuture
异步通知
sync()
channel()
ChannelFutureListener
operationComplete
ScheduledFuture
Channel
通讯连接通道,线程安全的
ServerChannel
传输类型
NioServerSocketChannel
非阻塞传输
EpollServerSocketChannel
非阻塞传输
OioServerSocketChannel
阻塞传输
Local
本地传输
Embedded
测试
SocketChannel
NioSocketChannel
method
config()
ChannelConfig
pipeline()
ChannelPipeline
eventLoop()
EventLoop
closeFuture()
ChannelFuture
bind()
connect()
read()
write()
写入ChannelPipeline缓存
flush()
ChannelHandlerContext
pipeline()
ChannelPipeline
channel()
Channel
executor()
EventExecutor
close()
ChannelFuture
alloc()
ByteBufAllocator
ChannelHandler
interface:
ChannelInboundHandler
ChannelInboundHandler
abstract:
ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter
method
channelActive()
channelRead()
channelReadComplete()
userEventTriggered()
exceptionCaught()
...
abstract
ChannelInitializer<C extends Channel>
ChannelInitializer<C extends Channel>
initChannel()
abstract
SimpleChannelInboundHandler<I>
SimpleChannelInboundHandler<I>
channelRead0()
类型匹配则自动释放byteBuf引用
interface:
ChannelOutboundHandler
abstract:
ChannelOutboundHandlerAdapter
class
WriteTimeoutHandler
WriteTimeoutHandler
abstract:
ChannelDuplexHandler
class
IdleStateHandler
IdleStateHandler
class
ReadTimeoutHandler
ReadTimeoutHandler
annotation
@Sharable
解码器
abstract
ByteToMessageDecoder
ByteToMessageDecoder
abstract
MessageToMessageDecoder<I>
MessageToMessageDecoder<I>
编码器
abstract
MessageToByteEncoder
MessageToByteEncoder
abstract
MessageToMessageEncoder<I>
MessageToMessageEncoder<I>
编解码器
abstract:
MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
分类
数据粘包/拆包处理
加解密处理
业务流程处理
优点
分层处理,网络处理和业务处理分离
0 条评论
下一页