netty高性能网络框架涉及的核心组件
2021-07-20 15:48:30 0 举报
netty源码解析,netty高性能网络框架涉及的核心组件,netty架构,netty原理
作者其他创作
大纲/内容
写入
三者之间的线程安全问题
JDK Selector
Channel自己的一些属性
创建
io.netty.channel public interface ChannelPipelineChannel创建的时候这个也会创建是一个ChannelHandler的双向列表(容器),就是event Handler,里面的一个个事件处理器就是Concurrent event Handler,具体的事件是容器元素处理的,不是容器处理的传统的拦截器入和出都要经过,netty是细化的特殊的拦截器,不是双向的,入站的不会管出去的入站处理器会处理IO线程生成的入站数据,从远端读取的,如果没有处理器就走到结尾丢掉出站事件到达最后就会被IO线程写到远端。 ChannelPipeline p = ...; p.addLast(\"1\
netty高性能网络框架原理&源码解析By deltaqin
ChannelHandlerA
AttributeKey维护业务数据流程中后续可以获取
ChannelPipeline、ChannelHandlerContext 都是线程安全的,因为同一个 Channel 的事件都会在一个线程中处理完毕(假设用户不自己启动线程)。但是,ChannelHandler 却不一定,ChannelHandler 类似于 Spring MVC 中的 Service 层,专门处理业务逻辑的地方,一个 ChannelHandler 实例可以供多个 Channel 使用,所以,不建议把有状态的变量放在 ChannelHandler 中,而是放在消息本身或者 ChannelHandlerContext 中。
实例
addLast0(newCtx);完成链表的改变,也就是链表中间插入元素
AbstractChannelHandlerContext.handler().handlerAdded(ctx);这个方法是我们自己可以重写的,标记当前的Handler的包装Context已经加入到pipeline里面了
eventLoop(管理一个线程)死循环监听处理事件
使用
1对1
2共享数据
涉及数据的复制,不推荐
最后调用
JDK提供的Future只能手动检查执行结果,而且是阻塞的。(get或者isDone)netty则对Future进行了增强,使用了观察者模式,一个Future上注册了很多感兴趣的Listener,会调用每一个每一个Listener的Complete方法,会传递自己,自己可以获取到Channel,拿到Channel就可以任意操作promise继承Future,只能写一次
ChannelHandlerContext连接ChannelHandler以及pipeline的纽带通知pipeline里面下一个Handler,ChannelHandler handler();可以获取对应的Handler,保存着 Channel 的上下文Channel channel(); 获取Channel对象的引用ChannelPipeline pipeline();获取pipeline,也可以修改pipeLine所属的对象
1系统设置
等待结束,一定是执行结束了,Future有结果了
数据
实际存放
Channel以下是连接相关的
DirectByteBuffer:先预订内存,如果内存不够,会进行清理,并尝试几次,调用unsafe的allocateMemory(size)方法来分配内存;初始化这片内存的值为0
Channel
HeapByteBuffer:byte 数组来实现的
NoCleaner释放内存引用计数法
一般是一对一如果添加到pipeline多次,就是多对一
addLast方法的详解:往pipeline的链表最后加一个HandlerContext。源码启动流程中每一个Channel一开始都会使用此方法添加ChannelInitializer,我们自己写的addLast也是这个方法
抽象类实现类分类: 堆上的直接内存的Heap 和 Direct 池化的和非池化的Pooled 和 Unpooled 安全和非安全Safe 和 Unsafe(类名不带 Unsafe 的即为 Safe)所以组合一共8种,ByteBuf 并没有类似于 CharBuf、IntBuf 这样的兄弟类。UnPooledHeapByteBuf 使用的是数组下标的方式修改数组内容(拆解int)UnPooledUnSafeHeapByteBuf 通过UnSafe直接修改数组的内存内容PooledDirectByteBufPooledUnSafeDirectByteBuf PooledHeapByteBuf PooledUnSafeHeapByteBuf
public class ByteBufTest { public static void main(String[] args) { // 1. 创建池化的分配器 ByteBufAllocator allocator = new PooledByteBufAllocator(false); // 2. 分配一个40B的ByteBuf ByteBuf byteBuf = allocator.heapBuffer(40); // 3. 写入数据 byteBuf.writeInt(4); // 4. 读取数据 System.out.println(byteBuf.readInt()); // 5. 回收内存 ReferenceCountUtil.release(byteBuf); // 6. 分配一个30B的ByteBuf(从线程缓存获取 ) ByteBuf byteBuf2 = allocator.heapBuffer(30); // 7. 再次分配一个40B的ByteBuf(不从线程缓存获取) ByteBuf byteBuf3 = allocator.heapBuffer(40); }}
AbstractConstant
源码剖析
拿来即用的参数,使用这些参数能够让我们以类型安全地方式来配置 Channel
内存池划分:如果是向 JVM 申请的内存,那就是堆内存池,如果是向操作系统申请的内存,那就是直接内存池。业界比较著名的内存分配器很多,jemalloc快速分配和回收、内存碎片少、支持性能分析,Netty 中的是原生 jemalloc 在 Java 中的一种实现方式jemalloc内存分配器
负责创建
attr()方法作用域(4.1改进):AttributeMap是共用的同一个Channel上的所有的HandlerContext是共享attr的源码的Context最后还是调用了Channel的attr,Channel的又是调用自己的父类Map的attr方法
constant配置信息以及业务信息
实现
netty编写涉及的核心组件
易混5点:1. 一个事件循环组会有多个事件循环2. 一个事件循环在整个生命周期只会有唯一一个Thread与之绑定3. 所有事件循环所处理的IO事件都将在所关联的Thread上处理4. 一个Channel在他的生命周期只会注册到一个事件循环上5. 一个事件循环在运行过程中,会被分配给一个或多个Channel(也就是会有多个Channel注册上来)
Channel对 Java 原生 Channel 的进一步封装,更简单功能丰富
ChannelPromise可写入的ChannelFuture,父子关系
DefaultChannelOption
接口 netty Future为什么不直接使用jdk??对listener的操作,观察者模式。特定操作结束就会调用,不需要手动调用,jdk的需要手动调用监听者GenericFutureListener实现的接口的方法operationComplete会被自动调用
eventLoopGroup内部维护一个eventLoop(相当于newSingleThreadExecutor())组成的 数组
ChannelHandler包含了入站和出站两种ChannelInboundHandler 和 ChannelOutboundHandler,处理不同的数据或拦截 IO 事件,并将其转发到 ChannelPipeline 中的下一个 ChannelHandler,运用的是责任链设计模式。一般使用抽象类:1. SimpleChannelInboundHandler:处理入站事件,不建议直接使用 ChannelInboundHandlerAdapter,前者可以自动释放2. ChannelOutboundHandlerAdapter:处理出站事件3. ChannelDuplexHandler:双向的
ByteBufNetty 在 JDK Buffer 之上新的缓冲区,语义清晰好用
事件循环(eventLoop--》eventExecutor)理解为只有一个线程的线程池,newSingleThreadExecutor()。有自己的任务队列,所以叫做管理一个线程
ConstantPool创建并且管理一些常量ConcurrentMap
Netty的ByteBuf是如何支持堆内存非池化实现的直接操作堆内存的Unsafe比之前只是操作字节数组拆分为4个字节放一个int的方式更加高效,省去了拆解的过程非UnSafe和原JDK的一样,int拆为4个字节,一一写到字节数组里UnSafe(默认):创建:UnpooledUnsafeHeapByteBuf 底层还是使用的 Java 原生的 byte 数组来实现的写入:span style=\
方法:writeXxx ()/readXxx ():写入、读取数据setXxx ()/getXxx ():根据索引写入、读取数据其它:返回 ByteBuf 的状态、遍历等方法,比如返回容量、是否可读等方法 public abstract ByteBufAllocator alloc();
java.util.concurrent.Future代表着异步计算的结果,提供了计算是否完成的 check,等待完成,取回等多种方法。isDone主动查询,阻塞;get,会一直阻塞到结果计算完成。cancel,一旦计算完成,就无法被取消了
value
使用等待超时实现连接超时
AbstractByteBufAllocator(boolean preferDirect) 构造器里面的directByDefault = preferDirect && PlatformDependent.hasUnsafe(); 判断如果是想用直接内存而且平台有UnSafe,就会默认使用直接内存。并不是说这里传了 preferDirect=true,最后就一定会创建直接内存形式的 ByteBuf ,因为:JDK 各版本之间并不完全兼容。比如低版本的 JDK 可能就不支持使用直接内存。不同的设备支持的功能不一样。安卓可能就不支持直接内存。不同类型的 JDK 支持的功能不一样。OpenJDK 相对来说会缺失一些功能。直接使用 Java 原生的方法创建了一个 DirectByteBuffer,可能换一个平台就无法运行,但是,使用 Netty 不会出现这种情况,它可能会退化成 HeapByteBuffer:
内存回收非池化的堆上和直接内存的ByteBuf,释放的时候堆上直接使用null等待GC,直接内存则是使用Cleaner,也就是和上面的DirectByteBuffer是一套逻辑清理。池化的:引用计数的领域和jdkGC访问可达分析算法不一样。引用计数值release()到达0的时候就会放回池子里面,访问引用计数值为0的对象就会触发异常。增加引用计数可以使用retain()方法。 https://netty.io/wiki/reference-counted-objects.html衍生buffer和原本的buffer是使用同一个的。引用计数不会变化,所以产生衍生buffer的时候要retain,加1
没有数据的复制,推荐使用
PlatformDependent.hasUnsafe(); 判断一个异常对象是不是null是的话就说明是支持UNSafe的。下面是具体的判断逻辑,可以略过,主要是通过JDK的UNSafe的调用创建了一个直接内存ByteBuffer
返回值 接口ChannelFuture针对Channel的Future接口Channel IO操作的结果,netty里面所有的IO都是异步的,因此会立即返回,得到一个ChannelFuture的实例,可以通过其获取IO操作的状态。可以 addListener() ,对Channel监控,完成时候会通知 ChannelFutureListener 不会阻塞,而不是使用await方法,因为会阻塞直到结果返回会有死锁发生。
池化
池化--》内存池当Netty根据请求的大小将其分成四类:Tiny、Small、Normal、Huge,这四类请求的分界线分别为512B、8KB、16MB为了加快分配内存的速度,Netty还使用了线程缓存,而线程缓存实际上是在有回收内存的情况下才有效,即ReferenceCountUtil.release(byteBuf)。
ChannelHandlerC
Bootstrap 与 ServerBootstrap 是 Netty 程序的引导类,主要用于配置各种参数,并启动整个 Netty 服务
ChannelFutureListener会被事件循环的IO线程调用,如果是耗时的处理方法是一样的
ChannelPipeline
线程
EventLoopGroup和EventLoop:netty的线程模型
是否显式地关闭了 Unsafe,即通过参数 io.netty.noUnsafe 控制的尝试反射访问 Unsafe 中的属性 theUnsafe 和方法 copyMemory,其中,theUnsafe 是我们获得 Unsafe 实例的唯一方法,因为这个类是 Java 核心类,有非常严格的权限控制,我们只能通过这种方式获得其实例。最后,会反射获取 DirectByteBuffer 中的 address 属性,这个 address 是定义在其父类 Buffer 中的。三步都成功了,才能宣判我们可以正确地使用 JDK自己的 Unsafe
分类
sync()
一些属性的设置
ChannelOptionTCP 相关的设置项,只维护key的信息,值在其余地方可以用来配置ChannelConfig初始化ServerBootStrap的时候可以设置,key以及对应的 T value
非池化 堆内存
而且isSuccess是isDone的细化,后者可能是失败null结束的
netty 的是两个指针实现的,不需要flipjdk只是使用了一个指针,最后读取的时候还需要flip反转
随机读写 ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i ++) { byte b = buffer.getByte(i); System.out.println((char) b); }
ServerBootstrap 作为服务端reactor实现,很多的childXxx(),比如,childHandler()、childOption() 等,都是为了设置subReactor。但是,没有 childChannel() 这个方法,因为会根据ServerSocketChannel自动选择对应的类型
callHandlerAdded0(newCtx);见名知意
读取数据 ByteBuf buffer = ...; while (buffer.isReadable()) { System.out.println(buffer.readByte()); }
io.netty.channel.socket.nio public class NioServerSocketChannelextends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel启动器里面指定的是什么channel,创建的时候就会使用反射的工厂去找到指定的class创建什么。而且创建的会创建pipeline。init 的时候会设置 Channel pipeline
一个 Channel 会绑定到一个 EventLoop 上
AbstractChannelHandlerContext
创建分配器:UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);:preferDirect表示是否偏向于使用直接内存用这个分配器来创建一个buf:allocator.heapBuffer(); 创建一个非池化基于堆内存的ByteBuf内部:根据是否可以使用Unsafe创建不同的类型 return PlatformDependent.hasUnsafe() ? new font color=\"#ff0000\
Netty的ByteBuf是如何支持直接内存非池化实现的,都是使用 Java 原生的 DirectByteBuffer默认情况下,Netty 是自己控制直接内存的释放,即使用 DirectByteBuffer 的 NoCleaner 构造方法,且支持使用 Unsafe。如果 DirectByteBuffer 没有 NoCleaner 方法,则使用包含 Cleaner 的构造方法,同时也支持使用 Unsafe。如果显式地说明了不支持使用 Unsafe 的话,那么最后只能交给 JDK 自己来处理了。第一类::UnpooledDirectByteBuf(默认)InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf:在创建分配器 UnpooledByteBufAllocator 的时候会查找 DirectByteBuffer 中是否包含 NoCleaner 的构造方法;在创建 UnpooledDirectByteBuf 的时候默认创建的是其子类 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf,这个类将不使用 Cleaner 来清理垃圾,同时,它的底层使用的是 Java 原生的 DirectByteBuffer,且包含 Unsafe;在清理内存的时候需要手动调用 ReferenceCountUtil.release(byteBuf); 方法,也就是引用计数值-1,为1的时候就清理内存;最终也是调用 Unsafe 的 freeMemory () 来释放内存和JDK的直接内存是是一样的第二类:UnpooledUnsafeDirectByteBuf(没有NoCleaner)DirectByteBuffer 中不包含 NoCleaner 的构造方法,就使用与 Java 原生一样的方式,创建一个包含 Cleaner 的 DirectByteBuffer。InstrumentedUnpooledUnsafeDirectByteBuf 继承自 UnpooledUnsafeDirectByteBuf ,分配内存的时候使用的是ByteBuffer.allocateDirect(initialCapacity);第二类:UnpooledUnsafeDirectByteBuf(指定netty显式不支持使用UnSafe)InstrumentedUnpooledDirectByteBuf 继承自 UnpooledDirectByteBuf,netty不支持,让JDK支持。分配内存的时候使用的ByteBuffer.allocateDirect(initialCapacity);
AttributeMap一个Channel多个Handler数据的共享也是在初始化ServerBootStrap的时候可以设置,key以及对应的 T value
Selector
io.netty.channel public interface Channel所有的IO操作都是异步的可以获取当前连接的状态及配置参数Channel pipeline 是其一个属性,包含了Handler的链条,处理 IO 事件支持父子关系,SocketChannel是ServerSocketChannel产生的,就有一个parent Channel。他们可以共享一个Socket连接,使用Channel之后要释放句柄。
io.netty.channel.DefaultChannelPipeline#font color=\"#ff0000\
public interface ByteBufAllocator { // 默认的分配器,除非显式地配成unpooled,否则使用pooled ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; // 创建一个ByteBuf,看具体的实现方式决定创建的是direct的还是heap的 ByteBuf font color=\"#ff0000\
平台相关,本质是反射读取我徐昂关的属性或方法PlateformDependentPlateformDependent0
newDirectBufferfont color=\"#ff0000\
ReferenceCountUtil.release(byteBuf);byteBuf 其实是 ReferenceCounted,AbstractReferenceCountedByteBuf#release()这个方法将其引用计数减一,默认的引用计数为2,如果减到1的话表示可以清理掉了
ChannelOption
ChannelHandler
ChannelHandlerContext 11
ChannelConfig:Channel持有的自己的属性里面可以设置ChannelOption为key的值是什么
等待超时和连接超时 * // BAD - NEVER DO THIS * {@link Bootstrap} b = ...; * {@link ChannelFuture} f = b.connect(...);font color=\"#ff0000\
FutureTask<String> future = new FutureTask<String>(new Callable<String>() { public String call() { return searcher.search(target); } });executor.execute(future);
进入else可能Future还没有完成
连接超时不应该使用等待超时来配置直接使用连接超时实现
观察者放到一个集合,来的时候遍历集合
写数据 ByteBuf buffer = ...; // 整数大于4字节,可以放下 while (buffer.maxWritableBytes() >= 4) { buffer.writeInt(random.nextInt()); }
ChannelHandlerContext 22
ChannelHandlerContext 1
UnpooledByteBufAllocator 和 PooledByteBufAllocator 类中默认都是偏向于使用 Direct 的方式创建 ByteBuf,PreferHeapByteBufAllocator 和 PreferredDirectByteBufAllocator 中保存了 ByteBufAllocator 的实例,其实真正干活还是交给 UnpooledByteBufAllocator 或者 PooledByteBufAllocator 类来实现的,这是设计模式中的装饰器模式的用法。原则:尽量使用池化,直接内存,unsafe
ChannelHandlerB
interface ArchiveSearcher { String search(String target); } * class App { * ExecutorService executor = ... * ArchiveSearcher searcher = ... * void showSearch(final String target) * throws InterruptedException { * Future<String> future * = executor.submit(new Callable<String>() { * public String call() { * return searcher.search(target); * }}); * displayOtherThings(); // do other things while searching * try { * displayText(future.get()); // use future * } catch (ExecutionException ex) { cleanup(); return; } * } * }
init的时候会调用代码写的设置,没有设置的话就是空的
编写涉及的核心组件
创建分配器:分配器里面有PlatformDependent.hasDirectBufferNoCleanerConstructor();来判断当前的JDK版本是否支持没有Cleaner的DirectByteBuffer,因为netty要自己管理内存的释放,在JDK的NIO里面内存释放时使用的是Cleaner 来做垃圾清理的,也就是将直接内存的清理(OS API的调用时机)和JVM的GC关联起来,通过虚引用的清理间接地控制直接内存的释放所有的 ByteBuf 都继承自 AbstractReferenceCountedByteBuf 类,所以,都可以调用其 release () 方法释放内存用这个分配器来建直接内存实现的 ByteBuf:allocator.directBuffer(),如果支持Unsafe,根据noCleaner的值判断使用哪个子类font color=\"#ff0000\
ChannelFuture
ChannelHandlerContext 2
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;ByteBuf buffer = allocator.buffer(length);buffer.writeXxx(xxx);
ChannelHandlerContext
空间整理
互相引用
address 保存的是直接内存的地址,操作 DirectByteBuffer 的时候实际上是对 address 指向地址的操作,当然,这种操作是通过 unsafe 来执行的;
JDK Buffer 按照数据类型分为 ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer,按照内存实现可以分为堆内存实现和直接内存实现。的基本内容:position、limit、capacity。写模式,position 从 0 开始,表示下一个可写的位置,limit 等于 capacity。读模式,position 重置为 0,表示下一个可读的位置,limit 等于切换之前 position 的位置,capacity 不变。通过 flip () 方法切换为读模式通;clean () 方法或 compact () 方法清除(部分)数据;rewind () 方法重新读取或重新写入数据;buf.put () 或者 channel.read (buf) 方法写入数据;buf.read () 或者 channel.write (buf) 方法读取数据
非池化 直接内存
@Overridepublic ByteBuf writeInt(int value) { // 一个int等于4个字节 // 检查是否可写,里面会做扩容相关的操作, // 最终会调用allocateArray()分配一个新的数组,并把旧数组的数据拷贝到新数组 // 且调用freeArray()把旧数组释放,当然,此时也会改变上面提到的监控的数值 ensureWritable0(4); // 在写索引的位置开始写入值 font color=\"#ff0000\
为什么我们写的Handler不需要并发控制?因为netty实现保证了一个事件循环在整个生命周期只会和一个Thread绑定为什么不能在channelRead0写耗时操作?左边第五点,因为会有成百上千Channel属于同一个事件循环,又是单线程,这不是堵死了吗,性能直线下降。这个时候就要用业务线程池EventExecutor,两种实现:实现一:在channelHandler回调方法使用自己定义的JDK线程池:ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(() -> { });font color=\"#ff0000\
最终是调用的 Unsafe 的 freeMemory () 方法来释放内存的,这与 DirectByteBuffer 的最终结果又是一致的(Deallocator 中调用了 Unsafe 的 freeMemory () 方法)。
引导
协议实现:DatagramChannel:UDP 协议的支持SocketChannel:TCP 协议的支持ServerSocketChannel:TCP 协议的支持SctpChannel:SCTP 协议的支持SctpServerChannel:SCTP 协议的支持RxtxChannel:RXTX 协议的支持,已废弃UdtChannel:UDT 协议的支持,已废弃
管道实际存放的是 new DefaultChannelHandlerContext
0 条评论
下一页