Netty详解
2022-08-08 17:49:19 0 举报
AI智能生成
Netty详解
作者其他创作
大纲/内容
Java I/O的演进之路
Linux网络I/O模型简介
非阻塞I/O模型:recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般都对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来。
非阻塞I/O模型:recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般都对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来。
I/O复用模型:Linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮我们侦测多个fd是否处于就绪状态。select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,因此性能更高。当有fd就绪时,立即回调函数rollback。
信号驱动I/O模型:首先开启套接口信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据
异步I/O:告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区)通知我们。这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们何时可以开始一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成。
I/O多路复用技术
目前支持I/O多路复用的系统调用有select、pselect、poll、epoll,在Linux网络编程过程中,很长一段时间都使用select做轮询和网络事件通知,然而select的一些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select的替代方案,最终选择了epoll。
epoll优点
支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)
值得庆幸的是,epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024。例如,在1GB内存的机器上大约是10万个句柄左右,具体的值可以通过cat /proc/sys/fs/file- max察看,通常情况下这个值跟系统的内存关系比较大
I/O效率不会随着FD数目的增加而线性下降
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态socket则不会。在这点上,epoll实现了一个伪AIO。针对epoll和select性能对比的benchmark测试表明:如果所有的socket都处于活跃态-例如一个高速LAN环境,epoll并不比select/poll效率高太多;相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
使用mmap加速内核与用户空间的消息传递
无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap同一块内存实现
epoll的API更加简单
Java I/O演进
在JDK 1.4推出Java NIO之前,基于Java的所有Socket通信都采用了同步阻塞模式(BIO)
2002年发布JDK1.4时,NIO以JSR-51的身份正式随JDK发布。它新增了个java.nio包,提供了很多进行异步I/O开发的API和类库
2011年7月28日,JDK1.7正式发布。它的一个比较大的亮点就是将原来的NIO类库进行了升级,被称为NIO2.0。
Java NIO 通信基础详解
详解 NIO Buffer 类及其属性
Buffer类
Buffer类是一个抽象类,对应于Java的主要数据类型,在NIO中有8种缓冲区类,分别如下:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer、MappedByteBuffer
前7种Buffer类型,覆盖了能在IO中传输的所有的Java基本数据类型。第8种类型MappedByteBuffer是专门用于内存映射的一种ByteBuffer类型。
重要属性
前7种Buffer类型,覆盖了能在IO中传输的所有的Java基本数据类型。第8种类型MappedByteBuffer是专门用于内存映射的一种ByteBuffer类型。
除此之外,还有一个标记属性:mark(标记),可以将当前的position临时存入mark中;需要的时候,可以再从mark标记恢复到position位置。
capacity属性
Buffer类的capacity属性,表示内部容量的大小。一旦写入的对象数量超过了capacity容量,缓冲区就满了,不能再写入了。
Buffer类的capacity属性一旦初始化,就不能再改变。原因是什么呢?Buffer类的对象在初始化时,会按照capacity分配内部的内存。在内存分配好之后,它的大小当然就不能改变了。
capacity容量不是指内存块byte[]数组的字节的数量。capacity容量指的是写入的数据对象的数量。
例子:Buffer类是一个抽象类,Java不能直接用来新建对象。使用的时候,必须使用Buffer的某个子类,例如使用DoubleBuffer,则写入的数据是double类型,如果其capacity是100,那么我们最多可以写入100个double数据。
position属性
Buffer类的position属性,表示当前的位置。position属性与缓冲区的读写模式有关。在不同的模式下,position属性的值是不同的。当缓冲区进行读写的模式改变时,position会进行调整。
在写入模式下,position的值变化规则如下:(1)在刚进入到写模式时,position值为0,表示当前的写入位置为从头开始。(2)每当一个数据写到缓冲区之后,position会向后移动到下一个可写的位置。(3)初始的position值为0,最大可写值position为limit-1。当position值达到limit时,缓冲区就已经无空间可写了。
在读模式下,position的值变化规则如下:(1)当缓冲区刚开始进入到读模式时,position会被重置为0。(2)当从缓冲区读取时,也是从position位置开始读。读取数据后,position向前移动到下一个可读的位置。(3)position最大的值为最大可读上限limit,当position达到limit时,表明缓冲区已经无数据可读
当缓冲区处于写入模式时,如果要从缓冲区读取数据,需要进行模式的切换,可以调用 flip() 转换方法,将缓冲区变成读取模式。
在这个flip翻转过程中,position会进行非常巨大的调整,具体的规则是:position由原来的写入位置,变成新的可读位置,也就是0,表示可以从头开始读。flip翻转的另外一半工作,就是要调整limit属性。
limit属性
Buffer类的limit属性,表示读写的最大上限。limit属性,也与缓冲区的读写模式有关。在不同的模式下,limit的值的含义是不同的。
在写模式下,limit属性值的含义为可以写入的数据最大上限。在刚进入到写模式时,limit的值会被设置成缓冲区的capacity容量值,表示可以一直将缓冲区的容量写满。
在读模式下,limit的值含义为最多能从缓冲区中读取到多少数据。
在flip翻转时,属性的调整,将涉及position、limit两个属性
首先,创建缓冲区。刚开始,缓冲区处于写模式。position为0, limit为最大容量。
然后,向缓冲区写数据。每写入一个数据,position向后面移动一个位置,也就是position的值加1。假定写入了5个数,当写入完成后,position的值为5。
这时,使用(即调用)flip方法,将缓冲区切换到读模式。limit的值,先会被设置成写模式时的position值。这里新的limit是5,表示可以读取的最大上限是5个数。同时,新的position会被重置为0,表示可以从0开始读。
mark属性
就是相当一个暂存属性,暂时保存position的值,方便后面的重复使用position值。
mark()方法:mark=position
reset()方法:position=mark
重要方法
allocate()创建缓冲区
IntBuffer是具体的Buffer子类,通过调用IntBuffer.allocate(20),创建了一个Intbuffer实例对象,并且分配了20 * 4个字节的内存空间。
put()写入到缓冲区
写入的数据类型要求与缓冲区的类型保持一致。
flip()翻转
将写入模式翻转成读取模式。
写入数据后,如果需要读取数据,需先转成读取模式
写入数据后,如果需要读取数据,需先转成读取模式
get()从缓冲区读取
读数据很简单,调用get方法,每次从position的位置读取一个数据,并且进行相应的缓冲区属性的调整。
rewind()倒带
已经读完的数据,如果需要再读一遍,可以调用rewind()方法。rewind()也叫倒带,就像播放磁带一样倒回去,再重新播放。
(1)position重置为0,所以可以重读缓冲区中的所有数据。(2)limit保持不变,数据量还是一样的,仍然表示能从缓冲区中读取多少个元素。(3)mark标记被清理,表示之前的临时位置不能再用了。
mark( )和reset( )
Buffer.mark()方法的作用是将当前position的值保存起来,放在mark属性中,让mark属性记住这个临时位置;之后,可以调用Buffer.reset()方法将mark的值恢复到position中。也就是说,Buffer.mark()和Buffer.reset()方法是配套使用的。两种方法都需要内部mark属性的支持。
clear( )清空缓冲区
在读取模式下,调用clear()方法将缓冲区切换为写入模式。此方法会将position清零,limit设置为capacity最大容量值,可以一直写入,直到缓冲区写满。
compact() 方法
将缓冲区模式从读取模式转为写入模式
详解NIO Channel(通道)类
NIO中一个连接就是用一个Channel(通道)来表示。大家知道,从更广泛的层面来说,一个通道可以表示一个底层的文件描述符,例如硬件设备、文件、网络连接等。然而,远远不止如此,除了可以对应到底层文件描述符,Java NIO的通道还可以更加细化。例如,对应不同的网络传输协议类型,在Java中都有不同的NIO Channel(通道)实现。
Channel(通道)的主要类型
这里不对纷繁复杂的Java NIO通道类型进行过多的描述,仅仅聚焦于介绍其中最为重要的四种Channel(通道)实现:FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel。
FileChannel文件通道,用于文件的数据读写。
读取FileChannel通道:在大部分应用场景,从通道读取数据都会调用通道的int read(ByteBufferbuf)方法,它从通道读取到数据写入到ByteBuffer缓冲区,并且返回读取到的数据量。
写入FileChannel通道:写入数据到通道,在大部分应用场景,都会调用通道的int write(ByteBufferbuf)方法。此方法的参数——ByteBuffer缓冲区,是数据的来源。write方法的作用,是从ByteBuffer缓冲区中读取数据,然后写入到通道自身,而返回值是写入成功的字节数。
关闭通道:当通道使用完成后,必须将其关闭。关闭非常简单,调用close方法即可。
强制刷新到磁盘:在将缓冲区写入通道时,出于性能原因,操作系统不可能每次都实时将数据写入磁盘。如果需要保证写入通道的缓冲数据,最终都真正地写入磁盘,可以调用FileChannel的force()方法。
FileChannel文件通道只有阻塞模式,不能切换到非阻塞模式
SocketChannel套接字通道,用于Socket套接字TCP连接的数据读写。
ServerSocketChannel服务器嵌套字通道(或服务器监听通道),允许我们监听TCP连接请求,为每个监听到的请求,创建一个SocketChannel套接字通道。
DatagramChannel数据报通道,用于UDP协议的数据读写。
详解NIO Selector选择器
Java NIO的三大核心组件:Channel(通道)、Buffer(缓冲区)、Selector(选择器)。其中通道和缓冲区,二者的联系也比较密切:数据总是从通道读到缓冲区内,或者从缓冲区写入到通道中。
选择器以及注册
简单地说:选择器的使命是完成IO的多路复用。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系。选择器提供了独特的API方法,能够选出(select)所监控的通道拥有哪些已经准备好的、就绪的IO操作事件。
一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。通过选择器,一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。
通道和选择器之间的关系,通过register(注册)的方式完成。调用通道的Channel.register(Selector sel, int ops)方法,可以将通道实例注册到一个选择器中。register方法有两个参数:第一个参数,指定通道注册到的选择器实例(一定要是继承了 SelectableChannel的通道,如 ServerSocketChannel和SocketChannel,像FileChannel就不行,他是阻塞的并且没有继承SelectableChannel);第二个参数,指定选择器要监控的IO事件类型。
通道IO事件
可供选择器监控的通道IO事件类型,包括以下四种:(1)可读:SelectionKey.OP_READ(2)可写:SelectionKey.OP_WRITE(3)连接:SelectionKey.OP_CONNECT(4)接收:SelectionKey.OP_ACCEPT
事件类型的定义在SelectionKey类中。如果选择器要监控通道的多种事件,可以用“按位或”运算符来实现。
int key = SelectionKey.OP_ACCEPT | SelectionKey.OP_WRITE
int key = SelectionKey.OP_ACCEPT | SelectionKey.OP_WRITE
什么是IO事件呢?这个概念容易混淆,这里特别说明一下。这里的IO事件不是对通道的IO操作,而是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件。
SelectionKey选择键
通道和选择器的监控关系注册成功后,就可以选择就绪事件。具体的选择工作,和调用选择器Selector的select()方法来完成。通过select方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的感兴趣的那些IO事件。换句话说,一旦在通道中发生了某些IO事件(就绪状态达成),并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey选择键的集合中。
SelectionKey选择键就是那些被选择器选中的IO事件。前面讲到,一个IO事件发生(就绪状态达成)后,如果之前在选择器中注册过,就会被选择器选中,并放入SelectionKey选择键集合中;如果之前没有注册过,即使发生了IO事件,也不会被选择器选中。SelectionKey选择键和IO的关系,可以简单地理解为:选择键,就是被选中了的IO事件。
选择键的功能是很强大的。通过SelectionKey选择键,不仅仅可以获得通道的IO事件类型,比方说SelectionKey.OP_READ;还可以获得发生IO事件所在的通道;另外,也可以获得选出选择键的选择器实例。
即:一个 SelectionKey 会于一个 SelectableChannel 绑定
即:一个 SelectionKey 会于一个 SelectableChannel 绑定
选择器使用流程
第一步:获取选择器实例
选择器实例是通过调用静态工厂方法open()来获取的
Selector selector = Selector.open();
Selector选择器的类方法open()的内部,是向选择器SPI(SelectorProvider)发出请求,通过默认的SelectorProvider(选择器提供者)对象,获取一个新的选择器实例。Java中SPI全称为(Service Provider Interface,服务提供者接口),是JDK的一种可以扩展的服务提供和发现机制。Java通过SPI的方式,提供选择器的默认实现版本。也就是说,其他的服务提供商可以通过SPI的方式,提供定制化版本的选择器的动态替换或者扩展。
第二步:将通道注册到选择器实例
要实现选择器管理通道,需要将通道注册到相应的选择器上
调用通道的register()方法,将ServerSocketChannel通道注册到了一个选择器上。当然,在注册之前,首先要准备好通道。
调用通道的register()方法,将ServerSocketChannel通道注册到了一个选择器上。当然,在注册之前,首先要准备好通道。
注册到选择器的通道,必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常。这意味着,FileChannel文件通道不能与选择器一起使用,因为FileChannel文件通道只有阻塞模式,不能切换到非阻塞模式;而Socket套接字相关的所有通道都可以。
一个通道,并不一定要支持所有的四种IO事件。例如服务器监听通道ServerSocketChannel,仅仅支持Accept(接收到新连接)IO事件;而SocketChannel传输通道,则不支持Accept(接收到新连接)IO事件。
可以在注册之前,可以通过通道的validOps()方法,来获取该通道所有支持的IO事件集合。
可以在注册之前,可以通过通道的validOps()方法,来获取该通道所有支持的IO事件集合。
第三步:选出感兴趣的IO就绪事件(选择键集合)
通过Selector选择器的select()方法,选出已经注册的、已经就绪的IO事件,保存到SelectionKey选择键集合中。SelectionKey集合保存在选择器实例内部,是一个元素为SelectionKey类型的集合(Set)。调用选择器的selectedKeys()方法,可以取得选择键集合。
接下来,需要迭代集合的每一个选择键,根据具体IO事件类型,执行对应的业务操作。
接下来,需要迭代集合的每一个选择键,根据具体IO事件类型,执行对应的业务操作。
处理完成后,需要将选择键从这个SelectionKey集合中移除,防止下一次循环的时候,被重复的处理。SelectionKey集合不能添加元素,如果试图向SelectionKey选择键集合中添加元素,则将抛出java.lang.UnsupportedOperationException异常。
使用 Iterator.remove() 方法。
使用 Iterator.remove() 方法。
用于选择就绪的IO事件的select()方法,有多个重载的实现版本
select():阻塞调用,一直到至少有一个通道发生了注册的IO事件。
select()方法返回的整数值(int整数类型),表示发生了IO事件的通道数量。更准确地说,是从上一次select到这一次select之间,有多少通道发生了IO事件。强调一下,select()方法返回的数量,指的是通道数,而不是IO事件数,准确地说,是指发生了选择器感兴趣的IO事件的通道数。
select()方法返回的整数值(int整数类型),表示发生了IO事件的通道数量。更准确地说,是从上一次select到这一次select之间,有多少通道发生了IO事件。强调一下,select()方法返回的数量,指的是通道数,而不是IO事件数,准确地说,是指发生了选择器感兴趣的IO事件的通道数。
select(long timeout):和select()一样,但最长阻塞时间为timeout指定的毫秒数。
selectNow():非阻塞,不管有没有IO事件,都会立刻返回
与Java OIO相比,Java NIO编程大致的特点如下:
(1)在NIO中,服务器接收新连接的工作,是异步进行的。不像Java的OIO那样,服务器监听连接,是同步的、阻塞的。NIO可以通过选择器(也可以说成:多路复用器),后续不断地轮询选择器的选择键集合,选择新到来的连接。
(2)在NIO中,SocketChannel传输通道的读写操作都是异步的。如果没有可读写的数据,负责IO通信的线程不会同步等待。这样,线程就可以处理其他连接的通道;不需要像OIO那样,线程一直阻塞,等待所负责的连接可用为止。
(3)在NIO中,一个选择器线程可以同时处理成千上万个客户端连接,性能不会随着客户端的增加而线性下降。
总之,有了Linux底层的epoll支持,有了Java NIO Selector选择器这样的应用层IO复用技术,Java程序从而可以实现IO通信的高TPS、高并发,使服务器具备并发数十万、数百万的连接能力。Java的NIO技术非常适合用于高性能、高负载的网络服务器。鼎鼎大名的通信服务器中间件Netty,就是基于Java的NIO技术实现的。
Reactor反应器模式
简介
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:
(1)Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。
(2)Handlers处理器的职责:非阻塞的执行业务处理逻辑。
(1)Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。
(2)Handlers处理器的职责:非阻塞的执行业务处理逻辑。
在这里,为了方便大家学习,将Doug Lea著名的文章《Scalable IO in Java》的链接地址贴出来:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf,建议大家去阅读一下,提升自己的基础知识,开阔下眼界。
多线程OIO的致命缺陷
死循环
在Java的OIO编程中,最初和最原始的网络服务器程序,是用一个while循环,不断地监听端口是否有新的连接。如果有,那么就调用一个处理函数来处理
这种方法的最大问题是:如果前一个网络连接的handle(socket)没有处理完,那么后面的连接请求没法被接收,于是后面的请求通通会被阻塞住,服务器的吞吐量就太低了。对于服务器来说,这是一个严重的问题。
这种方法的最大问题是:如果前一个网络连接的handle(socket)没有处理完,那么后面的连接请求没法被接收,于是后面的请求通通会被阻塞住,服务器的吞吐量就太低了。对于服务器来说,这是一个严重的问题。
Connection Per Thread
为了解决这个严重的连接阻塞问题,出现了一个极为经典模式:Connection Per Thread(一个线程处理一个连接)模式。
对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的输入和输出。当然,服务器的监听线程也是独立的,任何的socket连接的输入和输出处理,不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器,就是这样实现的。
优缺点
优点:解决了前面的新连接被严重阻塞的问题,在一定程度上,极大地提高了服务器的吞吐量。
缺点:对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。在系统中,线程是比较昂贵的系统资源。如果线程数太多,系统无法承受。而且,线程的反复创建、销毁、线程的切换也需要代价。因此,在高并发的应用场景下,多线程OIO的缺陷是致命的。
单线程的Reactor反应器模式
在事件驱动模式中,当有事件触发时,事件源会将事件dispatch分发到handler处理器进行事件处理。反应器模式中的反应器角色,类似于事件驱动模式中的dispatcher事件分发器角色。
两个重要组件
1)Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中选择器监控的通道IO事件。
2)Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。
简单地说,Reactor反应器和Handers处理器处于一个线程中执行。
基于Java NIO,如何实现简单的单线程版本的反应器模式呢?需要用到SelectionKey选择键的几个重要的成员方法:
方法一:void attach(Object o)此方法可以将任何的Java POJO对象,作为附件添加到SelectionKey实例,相当于附件属性的setter方法。这方法非常重要,因为在单线程版本的反应器模式中,需要将Handler处理器实例,作为附件添加到SelectionKey实例。
方法二:Object attachment()此方法的作用是取出之前通过attach(Object o)添加到SelectionKey选择键实例的附件,相当于附件属性的getter方法,与attach(Object o)配套使用。
总之,在反应器模式中,需要进行attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler处理器绑定到选择键;当事件发生时,调用attachment方法,可以从选择键取出Handler处理器,将事件分发到Handler处理器中,完成业务处理。
单线程Reactor反应器模式,是基于Java的NIO实现的。相对于传统的多线程OIO,反应器模式不再需要启动成千上万条线程,效率自然是大大提升了。
缺点
在单线程反应器模式中,Reactor反应器和Handler处理器,都执行在同一条线程上。这样,带来了一个问题:当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,如果被阻塞的Handler不仅仅负责输入和输出处理的业务,还包括负责连接监听的AcceptorHandler处理器。这个是非常严重的问题。
多线程的Reactor反应器模式
多线程池Reactor反应器的演进,分为两个方面:
1)首先是升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以考虑使用线程池。
将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听受到阻塞。
将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听受到阻塞。
2)其次是升级Reactor反应器。可以考虑引入多个Selector选择器,提升选择大量通道的能力。
如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。
如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。
优缺点
优点
· 响应快,虽然同一反应器线程本身是同步的,但不会被单个连接的同步IO所阻塞;
· 编程相对简单,最大程度避免了复杂的多线程同步,也避免了多线程的各个进程之间切换的开销;
· 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。
· 编程相对简单,最大程度避免了复杂的多线程同步,也避免了多线程的各个进程之间切换的开销;
· 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。
缺点
· 反应器模式增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
· 反应器模式需要操作系统底层的IO多路复用的支持,如Linux中的epoll。如果操作系统的底层不支持IO多路复用,反应器模式不会有那么高效。
· 同一个Handler业务线程中,如果出现一个长时间的数据读写,会影响这个反应器中其他通道的IO处理。例如在大文件传输时,IO操作就会影响其他客户端(Client)的响应时间。因而对于这种操作,还需要进一步对反应器模式进行改进。
· 反应器模式需要操作系统底层的IO多路复用的支持,如Linux中的epoll。如果操作系统的底层不支持IO多路复用,反应器模式不会有那么高效。
· 同一个Handler业务线程中,如果出现一个长时间的数据读写,会影响这个反应器中其他通道的IO处理。例如在大文件传输时,IO操作就会影响其他客户端(Client)的响应时间。因而对于这种操作,还需要进一步对反应器模式进行改进。
并发基础中的Future异步回调模式
join异步阻塞
join方法的应用场景:A线程调用B线程的join方法,等待B线程执行完成;在B线程没有完成前,A线程阻塞。
有三个重载版本
void join():A线程等待B线程执行结束后,A线程重新恢复执行。
void join(long millis):A线程等待B线程执行一段时间,最长等待时间为millis毫秒。超过millis毫秒后,不论B线程是否结束,A线程重新恢复执行。
void join(long millis, int nanos):等待B线程执行一段时间,最长等待时间为millis毫秒,加nanos纳秒。超过时间后,不论B线程是否结束,A线程重新恢复执行。
强调一下容易混淆的几点:
(1)join是实例方法,不是静态方法,需要使用线程对象去调用,如thread.join()。
(2)join调用时,不是线程所指向的目标线程阻塞,而是当前线程阻塞。
(3)只有等到当前线程所指向的线程执行完成,或者超时,当前线程才能重新恢复执行。
(2)join调用时,不是线程所指向的目标线程阻塞,而是当前线程阻塞。
(3)只有等到当前线程所指向的线程执行完成,或者超时,当前线程才能重新恢复执行。
join有一个问题:被合并的线程没有返回值。例如,在烧水的实例中,如果烧水线程的执行结束,main线程是无法知道结果的。同样,清洗线程的执行结果,main线程也是无法知道的。形象地说,join线程合并就是一像一个闷葫芦。只能发起合并线程,不能取到执行结果。
FutureTask异步回调之重武器
Callable接口
在介绍Callable接口之前,先得重提一下旧的Runnable接口。Runnable接口是在Java多线程中表示线程的业务代码的抽象接口。但是,Runnable有一个重要的问题,它的run方法是没有返回值的。正因为如此,Runnable不能用于需要有返回值的应用场景。
为了解决Runnable接口的问题,Java定义了一个新的和Runnable类似的接口—— Callable接口。并且将其中的代表业务处理的方法命名为call, call方法有返回值。
Callable接口是一个泛型接口,也声明为了“函数式接口”。其唯一的抽象方法call有返回值,返回值的类型为泛型形参的实际类型。call抽象方法还有一个Exception的异常声明,容许方法内部的异常不经过捕获。
Callable接口与Runnable接口相比,还有一个很大的不同:Callable接口的实例不能作为Thread线程实例的target来使用;而Runnable接口实例可以作为Thread线程实例的target构造参数,开启一个Thread线程。
FutureTask类
FutureTask类代表一个未来执行的任务,表示新线程所执行的操作。FutureTask类也位于java.util.concurrent包中。FutureTask类的构造函数的参数为Callable类型,实际上是对Callable类型的二次封装,可以执行Callable的call方法。FutureTask类间接地继承了Runnable接口,从而可以作为Thread实例的target执行目标。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
总体来说,FutureTask类首先是一个搭桥类的角色,FutureTask类能当作Thread线程去执行目标target,被异步执行;其次,如果要获取异步执行的结果,需要通过FutureTask类的方法去获取,在FutureTask类的内部,会将Callable的call方法的真正结果保存起来,以供外部获取。
在Java语言中,将FutureTask类的一系列操作,抽象出来作为一个重要的接口——Future接口。当然,FutureTask类也实现了此接口。
重点成员变量
private Callable<V> callable
FutureTask内部有一个Callable类型的成员,代表异步执行的逻辑
callable实例属性必须要在FutureTask类的实例构造时进行初始化
private Object outcome
outcome属性所保存的结果,可供FutureTask类的结果获取方法(如get)来获取。
Future接口
主要提供三大功能
(1)判断并发任务是否执行完成。
(2)获取并发的任务完成后的结果。
(3)取消并发执行中的任务。
(2)获取并发的任务完成后的结果。
(3)取消并发执行中的任务。
源码
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
V get():获取并发任务执行的结果。注意,这个方法是阻塞性的。如果并发任务没有执行完成,调用此方法的线程会一直阻塞,直到并发任务执行完成。
V get(Long timeout, TimeUnit unit):获取并发任务执行的结果。也是阻塞性的,但是会有阻塞的时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。
booleanisDone():获取并发任务的执行状态。如果任务执行结束,则返回true。
booleanisCancelled():获取并发任务的取消状态。如果任务完成前被取消,则返回true。
boolean cancel(booleanmayInterruptRunning):取消并发任务的执行。
问题
FutureTask 虽然比 join 方法要高明一点,可以获取到异步线程的执行结果,但是获取结果的get()方法也是一个阻塞方法。
异步阻塞的效率往往是比较低的,被阻塞的主线程不能干任何事情,唯一能干的,就是在傻傻地等待。原生Java API,除了阻塞模式的获取结果外,并没有实现非阻塞的异步结果获取方法。如果需要用到获取异步的结果,则需要引入一些额外的框架,这里首先介绍谷歌公司的Guava框架。
Guava的异步回调模式
Guava的主要手段是增强而不是另起炉灶。为了实现非阻塞获取异步线程的结果,Guava对Java的异步回调机制,做了以下的增强:
(1)引入了一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务,在Guava中能被监控和获得非阻塞异步执行的结果。
(2)引入了一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的,是在异步任务执行完成后,根据异步结果,完成不同的回调处理,并且可以处理异步结果。
详解FutureCallback
FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。
FutureCallback拥有两个回调方法:
(1)onSuccess方法,在异步任务执行成功后被回调;调用时,异步任务的执行结果,作为onSuccess方法的参数被传入。
(2)onFailure方法,在异步任务执行过程中,抛出异常时被回调;调用时,异步任务所抛出的异常,作为onFailure方法的参数被传入
FutureCallback拥有两个回调方法:
(1)onSuccess方法,在异步任务执行成功后被回调;调用时,异步任务的执行结果,作为onSuccess方法的参数被传入。
(2)onFailure方法,在异步任务执行过程中,抛出异常时被回调;调用时,异步任务所抛出的异常,作为onFailure方法的参数被传入
FutureCallback 与 Callable 区别:
(1)Java的Callable接口,代表的是异步执行的逻辑。
(2)Guava的FutureCallback接口,代表的是Callable异步逻辑执行完成之后,根据成功或者异常两种情况,所需要执行的善后工作。
(1)Java的Callable接口,代表的是异步执行的逻辑。
(2)Guava的FutureCallback接口,代表的是Callable异步逻辑执行完成之后,根据成功或者异常两种情况,所需要执行的善后工作。
详解ListenableFuture
Guava如何实现异步任务Callable和FutureCallback结果回调之间的监控关系呢? Guava引入了一个新接口ListenableFuture,它继承了Java的Future接口,增强了监控的能力。
istenableFuture仅仅增加了一个方法——addListener方法。它的作用就是将前一小节的FutureCallback善后回调工作,封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行善后处理。
在实际编程中,如何将FutureCallback回调逻辑绑定到异步的ListenableFuture任务呢?可以使用Guava的Futures工具类,它有一个addCallback静态方法,可以将FutureCallback的回调实例绑定到ListenableFuture异步任务。
总结一下,Guava异步回调的流程如下:
第1步:实现Java的Callable接口,创建异步执行逻辑。还有一种情况,如果不需要返回值,异步执行逻辑也可以实现Java的Runnable接口。
第2步:创建Guava线程池。
第3步:将第1步创建的Callable/Runnable异步执行逻辑的实例,通过submit提交到Guava线程池,从而获取ListenableFuture异步任务实例。
第4步:创建FutureCallback回调实例,通过Futures.addCallback将回调实例绑定到ListenableFuture异步任务上。
第1步:实现Java的Callable接口,创建异步执行逻辑。还有一种情况,如果不需要返回值,异步执行逻辑也可以实现Java的Runnable接口。
第2步:创建Guava线程池。
第3步:将第1步创建的Callable/Runnable异步执行逻辑的实例,通过submit提交到Guava线程池,从而获取ListenableFuture异步任务实例。
第4步:创建FutureCallback回调实例,通过Futures.addCallback将回调实例绑定到ListenableFuture异步任务上。
Netty回调模式
Netty官方文档中指出Netty的网络操作都是异步的。在Netty源代码中,大量使用了异步回调处理模式。在Netty的业务开发层面,Netty应用的Handler处理器中的业务处理代码,也都是异步执行的。
Netty对JavaFuture异步任务的扩展如下
(1)继承Java的Future接口,得到了一个新的属于Netty自己的Future异步任务接口;该接口对原有的接口进行了增强(判断执行状态、增加/删除回调监听等),使得Netty异步任务能够以非阻塞的方式处理回调的结果;注意,Netty没有修改Future的名称,只是调整了所在的包名,Netty的Future类的包名和Java的Future接口的包名不同。
(2)引入了一个新接口——GenericFutureListener,用于表示异步执行完成的监听器。这个接口和Guava的FutureCallbak回调接口不同。Netty使用了监听器的模式,异步任务的执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。
总体上说,在异步非阻塞回调的设计思路上,Netty和Guava的思路是一致的。对应关系为:
· Netty的Future接口,可以对应到Guava的ListenableFuture接口。
· Netty的GenericFutureListener接口,可以对应到Guava的FutureCallback接口。
· Netty的Future接口,可以对应到Guava的ListenableFuture接口。
· Netty的GenericFutureListener接口,可以对应到Guava的FutureCallback接口。
Netty原理与基础
解密Netty中的Reactor反应器模式
一个IO事件从操作系统底层产生后,在Reactor反应器模式中的处理流程
第1步:通道注册。IO源于通道(Channel)。IO是和通道(对应于底层连接而言)强相关的。一个IO事件,一定属于某个通道。但是,如果要查询通道的事件,首先要将通道注册到选择器。只需通道提前注册到Selector选择器即可,IO事件会被选择器查询到。
第2步:查询选择。在反应器模式中,一个反应器(或者SubReactor子反应器)会负责一个线程;不断地轮询,查询选择器中的IO事件(选择键)。
第3步:事件分发。如果查询到IO事件,则分发给与IO事件有绑定关系的Handler业务处理器。
第4步:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。
第2步:查询选择。在反应器模式中,一个反应器(或者SubReactor子反应器)会负责一个线程;不断地轮询,查询选择器中的IO事件(选择键)。
第3步:事件分发。如果查询到IO事件,则分发给与IO事件有绑定关系的Handler业务处理器。
第4步:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。
Netty中的Channel通道组件
Netty中的每一种协议的通道,都有NIO(异步IO)和OIO(阻塞式IO)两个版本
对应于不同的协议,Netty中常见的通道类型如下
· NioSocketChannel:异步非阻塞TCP Socket传输通道。
· NioServerSocketChannel:异步非阻塞TCP Socket服务器端监听通道。
· NioDatagramChannel:异步非阻塞的UDP传输通道。
· NioSctpChannel:异步非阻塞Sctp传输通道。
· NioSctpServerChannel:异步非阻塞Sctp服务器端监听通道。
· OioSocketChannel:同步阻塞式TCP Socket传输通道。
· OioServerSocketChannel:同步阻塞式TCP Socket服务器端监听通道。
· OioDatagramChannel:同步阻塞式UDP传输通道。
· OioSctpChannel:同步阻塞式Sctp传输通道。
· OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道。
· NioServerSocketChannel:异步非阻塞TCP Socket服务器端监听通道。
· NioDatagramChannel:异步非阻塞的UDP传输通道。
· NioSctpChannel:异步非阻塞Sctp传输通道。
· NioSctpServerChannel:异步非阻塞Sctp服务器端监听通道。
· OioSocketChannel:同步阻塞式TCP Socket传输通道。
· OioServerSocketChannel:同步阻塞式TCP Socket服务器端监听通道。
· OioDatagramChannel:同步阻塞式UDP传输通道。
· OioSctpChannel:同步阻塞式Sctp传输通道。
· OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道。
在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员。通过这个内部的Java NIO通道,Netty的NioSocketChannel通道上的IO操作,最终会落地到Java NIO的SelectableChannel底层通道。
Netty中的Reactor反应器
在反应器模式中,一个反应器(或者SubReactor子反应器)会负责一个事件处理线程,不断地轮询,通过Selector选择器不断查询注册过的IO事件(选择键)。如果查询到IO事件,则分发给Handler业务处理器。
Netty中的反应器有多个实现类,与Channel通道类有关系。对应于NioSocketChannel通道,Netty的反应器类为:NioEventLoop。
NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。
NioEventLoop和前面章节讲到反应器,在思路上是一致的:一个NioEventLoop拥有一个Thread线程,负责一个Java NIO Selector选择器的IO事件轮询。
一个反应器和通道之间是一对多的关系:一个反应器可以查询很多各通道的IO事件
Netty中的Handler处理器
Netty的Handler处理器分为两大类:第一类是ChannelInboundHandler通道入站处理器;第二类是ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口。
ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter,叫作通道入站处理适配器。ChanneOutboundHandler的默认实现为ChanneloutBoundHandlerAdapter,叫作通道出站处理适配器。这两个默认的通道处理适配器,分别实现了入站操作和出站操作的基本功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。
通道和Handler处理器实例之间是多对多的关系:一个通道的IO事件被多个Handler实例处理;一个Handler处理器实例也能绑定到很多的通道,处理多个通道的IO事件。
Netty的流水线
通道和Handler处理器实例之间的绑定关系,Netty是如何组织的?
Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起,形成一条流水线。ChannelPipeline(通道流水线)的默认实现,实际上被设计成一个双向链表。所有的Handler处理器实例被包装成了双向链表的节点,被加入到了ChannelPipeline(通道流水线)中。
Netty是这样规定的:入站处理器Handler的执行次序,是从前到后;出站处理器Handler的执行次序,是从后到前。总之,IO事件在流水线上的执行次序,与IO事件的类型是有关系的
除了流动的方向与IO操作的类型有关之外,流动过程中经过的处理器节点的类型,也是与IO操作的类型有关。入站的IO操作只会且只能从Inbound入站处理器类型的Handler流过;出站的IO操作只会且只能从Outbound出站处理器类型的Handler流过。
入站
Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起,形成一条流水线。ChannelPipeline(通道流水线)的默认实现,实际上被设计成一个双向链表。所有的Handler处理器实例被包装成了双向链表的节点,被加入到了ChannelPipeline(通道流水线)中。
在向后流动的过程中,会出现3种情况:
(1)如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器,向后流动。
(2)如果后面没有其他的入站处理器,这就意味着这个IO事件在此次流水线中的处理结束了。
(3)如果在流水线中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也被终止了。
(1)如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器,向后流动。
(2)如果后面没有其他的入站处理器,这就意味着这个IO事件在此次流水线中的处理结束了。
(3)如果在流水线中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也被终止了。
详解Bootstrap启动器类
Bootstrap类是Netty提供的一个便利的工厂类,可以通过它来完成Netty的客户端或服务器端的Netty组件的组装,以及Netty程序的初始化。当然,Netty的官方解释是,完全可以不用这个Bootstrap启动器。但是,一点点去手动创建通道、完成各种设置和启动、并且注册到EventLoop,这个过程会非常麻烦。通常情况下,还是使用这个便利的Bootstrap工具类会效率更高。
启动器分为服务器启动类,ServerBootstrap 和客户端启动类,Bootstarp
父子通道
每一个NioSocketChannel通道锁封装的是Java NIO通道,再往下就对应到了操作系统底层的socket描述符
理论上,操作系统底层的socket描述符分为两类:
连接监听类型。连接监听类型的socket描述符,放在服务器端,它负责接收客户端的套接字连接;在服务器端,一个“连接监听类型”的socket描述符可以接受(Accept)成千上万的传输类的socket描述符。
传输数据类型。数据传输类的socket描述符负责传输数据。同一条TCP的Socket传输链路,在服务器和客户端,都分别会有一个与之相对应的数据传输类型的socket描述符。
在Netty中,异步非阻塞的服务器端监听通道NioServerSocketChannel,封装在Linux底层的描述符,是“连接监听类型”socket描述符;而NioSocketChannel异步非阻塞TCP Socket传输通道,封装在底层Linux的描述符,是“数据传输类型”的socket描述符。
在Netty中,将有接收关系的NioServerSocketChannel和NioSocketChannel,叫作父子通道。其中,NioServerSocketChannel负责服务器连接监听和接收,也叫父通道(Parent Channel)。对应于每一个接收到的NioSocketChannel传输类通道,也叫子通道(Child Channel)。
EventLoogGroup线程组
多线程版本的反应器模式
在Netty中,一个EventLoop相当于一个子反应器(SubReactor)。大家已经知道,一个NioEventLoop子反应器拥有了一个线程,同时拥有一个Java NIO选择器。Netty如何组织外层的反应器呢?答案是使用EventLoopGroup线程组。多个EventLoop线程组成一个EventLoopGroup线程组。
反过来说,Netty的EventLoopGroup线程组就是一个多线程版本的反应器。而其中的单个EventLoop线程对应于一个子反应器(SubReactor)。
反过来说,Netty的EventLoopGroup线程组就是一个多线程版本的反应器。而其中的单个EventLoop线程对应于一个子反应器(SubReactor)。
默认的EventLoopGroup内部线程数为最大可用的CPU处理器数量的2倍
服务端两个EventLoogGroup线程组
在服务器端,一般有两个独立的反应器,一个反应器负责新连接的监听和接受,另一个反应器负责IO事件处理。对应到Netty服务器程序中,则是设置两个EventLoopGroup线程组,一个EventLoopGroup负责新连接的监听和接受,一个EventLoopGroup负责IO事件处理。
负责新连接的监听和接受的EventLoopGroup线程组,查询父通道(NioServerSocketChannel)的IO事件,称为Boss线程组。负责查询所有子通道(NioSocketChannel)的IO事件,并且执行Handler处理器中的业务逻辑,称为Worker线程组。
Bootstrap启动流程
第1步:创建反应器线程组,并赋值给ServerBootstrap启动器实例
不一定非得配置两个线程组,可以仅配置一个EventLoopGroup反应器线程组。具体的配置方法是调用b.group( workerGroup)。在这种模式下,连接监听IO事件和数据传输IO事件可能被挤在了同一个线程中处理。这样会带来一个风险:新连接的接受被更加耗时的数据传输或者业务处理所阻塞。
在服务器端,建议设置成两个线程组的工作模式。
第2步:设置通道的IO类型
Netty不止支持Java NIO,也支持阻塞式的OIO(也叫BIO, Block-IO,即阻塞式IO)。
第3步:设置监听端口
第4步:设置传输通道的配置选项
给父通道设置一些通道选项,使用Bootstrap.option() 方法;给子通道设置就使用Bootstrap.childOption() 方法。
常见选项
SO_RCVBUF, SO_SNDBUF
此为TCP参数。每个TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接的这两个缓冲区大小的。TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的缓冲区及其填充的状态。
TCP_NODELAY
此为TCP参数。表示立即发送数据,默认值为True(Netty默认为True,而操作系统默认为False)。该值用于设置Nagle算法的启用,该算法将小的碎片数据连接成更大的报文(或数据包)来最小化所发送报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输的延时。说明一下:这个参数的值,与是否开启Nagle算法是相反的,设置为true表示关闭,设置为false表示开启。通俗地讲,如果要求高实时性,有数据发送时就立刻发送,就设置为true,如果需要减少发送次数和减少网络交互次数,就设置为false。
SO_KEEPALIVE
此为TCP参数。表示底层TCP协议的心跳机制。true为连接保持心跳,默认值为false。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
SO_REUSEADDR
此为TCP参数。设置为true时表示地址复用,默认值为false。有四种情况需要用到这个参数设置:· 当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而我们希望启动的程序的socket2要占用该地址和端口。例如在重启服务且保持先前端口时。· 有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。· 单个进程绑定相同的端口到多个socket(套接字)上,但每个socket绑定的IP地址不同。· 完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。
SO_LINGER
此为TCP参数。表示关闭socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socket.close()方法立即返回,但操作系统底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,操作系统放弃发送缓冲区的数据,直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞,直到延迟时间到来、发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。
SO_BACKLOG
此为TCP参数。表示服务器端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,在Windows中为200,其他操作系统为128。如果连接建立频繁,服务器处理新连接较慢,可以适当调大这个参数.
SO_BROADCAST
此为TCP参数。表示设置广播模式。
第5步:装配子通道的Pipeline流水线
每一个通道的子通道,都用一条ChannelPipeline流水线。它的内部有一个双向的链表。装配流水线的方式是:将业务处理器ChannelHandler实例加入双向链表中。
装配子通道的Handler流水线调用ServerBootstrap.childHandler()方法,传递一个ChannelInitializer通道初始化类的实例。在父通道成功接收一个连接,并创建成功一个子通道后,就会初始化子通道,这里配置的ChannelInitializer实例就会被调用。
在ChannelInitializer通道初始化类的实例中,有一个initChannel初始化方法,在子通道创建后会被执行到,向子通道流水线增加业务处理器。
ps:也可以为父通道装配流水线,但是没必要,父通道的内部业务处理是固定的。真的需要,也可以使用 ServerBootstrap.handler方法。
第6步:开始绑定服务器新连接的监听端口
调用 Bootstrap.bind() 方法进行端口绑定,方法返回channelFuture。可以调用sync()方法进行同步阻塞等待,或者添加事件监听器进行异步非阻塞处理。
第7步:自我阻塞,直到通道关闭
第8步:关闭EventLoopGroup
关闭Reactor反应器线程组,同时会关闭内部的subReactor子反应器线程,也会关闭内部的Selector选择器、内部的轮询线程以及负责查询的所有的子通道。在子通道关闭后,会释放掉底层的资源,如TCP Socket文件描述符等。
详解Channel通道
Channel通道的主要成员和方法
通道的抽象类AbstractChanel构造函数
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel内部有一个pipeline属性,表示处理器的流水线。Netty在对通道进行初始化的时候,将pipeline属性初始化为DefaultChannelPipeline的实例。这段代码也表明,每个通道拥有一条ChannelPipeline处理器流水线。
AbstractChannel内部有一个parent属性,表示通道的父通道。对于连接监听通道(如NioServerSocketChannel实例)来说,其父亲通道为null;而对于每一条传输通道(如NioSocketChannel实例),其parent属性的值为接收到该连接的服务器连接监听通道。
重要方法
方法1. ChannelFuture connect(SocketAddress address)
此方法的作用为:连接远程服务器。方法的参数为远程服务器的地址,调用后会立即返回,返回值为负责连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。
此方法的作用为:连接远程服务器。方法的参数为远程服务器的地址,调用后会立即返回,返回值为负责连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。
方法2. ChannelFuture bind(SocketAddress address)
此方法的作用为:绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道使用。
此方法的作用为:绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道使用。
方法3. ChannelFuture close()
此方法的作用为:关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync( ) 方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。
此方法的作用为:关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync( ) 方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。
方法4. Channel read()
此方法的作用为:读取通道数据,并且启动入站处理。具体来说,从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。
此方法的作用为:读取通道数据,并且启动入站处理。具体来说,从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。
方法5. ChannelFuture write(Object o)
此方法的作用为:启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。
此方法的作用为:启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。
方法6. Channel flush()
此方法的作用为:将缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。
此方法的作用为:将缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。
EmbeddedChannel嵌入式通道
EmbeddedChannel仅仅是模拟入站与出站的操作,底层不进行实际的传输,不需要启动Netty服务器和客户端。除了不进行传输之外,EmbeddedChannel的其他的事件机制和处理流程和真正的传输通道是一模一样的。因此,使用它,开发人员可以在开发的过程中方便、快速地进行ChannelHandler业务处理器的单元测试。
提供了最为重要的两个方法为:writeInbound和readOutbound方法。
方法1. writeInbound入站数据写到通道
它的使用场景是:测试入站处理器。在测试入站处理器时(例如测试一个解码器),需要读取Inbound(入站)数据。可以调用writeInbound方法,向EmbeddedChannel写入一个入站二进制ByteBuf数据包,模拟底层的入站包。
它的使用场景是:测试入站处理器。在测试入站处理器时(例如测试一个解码器),需要读取Inbound(入站)数据。可以调用writeInbound方法,向EmbeddedChannel写入一个入站二进制ByteBuf数据包,模拟底层的入站包。
方法2. readOutbound读取通道的出站数据
它的使用场景是:测试出站处理器。在测试出站处理器时(例如测试一个编码器),需要查看处理过的结果数据。可以调用readOutbound方法,读取通道的最终出站结果,它是经过流水线一系列的出站处理后,最终的出站数据包。比较绕口,重复一遍,通过readOutbound,可以读取完成EmbeddedChannel最后一个出站处理器,处理后的ByteBuf二进制出站包。
它的使用场景是:测试出站处理器。在测试出站处理器时(例如测试一个编码器),需要查看处理过的结果数据。可以调用readOutbound方法,读取通道的最终出站结果,它是经过流水线一系列的出站处理后,最终的出站数据包。比较绕口,重复一遍,通过readOutbound,可以读取完成EmbeddedChannel最后一个出站处理器,处理后的ByteBuf二进制出站包。
详解Handler业务处理器
在Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。
整个的IO处理操作环节包括:从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端。
前后两个环节,从通道读数据包和由通道发送到对端,由Netty的底层负责完成,不需要用户程序负责。用户程序主要在Handler业务处理器中,Handler涉及的环节为:数据包解码、业务处理、目标数据编码、把数据包写到通道中。
整个的IO处理操作环节包括:从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端。
前后两个环节,从通道读数据包和由通道发送到对端,由Netty的底层负责完成,不需要用户程序负责。用户程序主要在Handler业务处理器中,Handler涉及的环节为:数据包解码、业务处理、目标数据编码、把数据包写到通道中。
入站和出站:
· 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。
· 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。
· 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。
· 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。
ChannelInboundHandler通道入站处理器(几个重要方法):
1.channelRegistered当通道注册完成后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到。
2.channelActive当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelActive方法,会被调用到。
3.channelRead当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRead方法,会被调用到。
4.channelReadComplete当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。
5.channelInactive当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件。通道会启动对应的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。
6.exceptionCaught当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的exceptionCaught方法,会被调用到。注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到了该方法。
2.channelActive当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelActive方法,会被调用到。
3.channelRead当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRead方法,会被调用到。
4.channelReadComplete当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。
5.channelInactive当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件。通道会启动对应的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。
6.exceptionCaught当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的exceptionCaught方法,会被调用到。注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到了该方法。
在Netty中,它的默认实现为ChannelInboundHandlerAdapter,在实际开发中,只需要继承这个ChannelInboundHandlerAdapter默认实现,重写自己需要的方法即可。
ChannelInboundHandler的执行回调流程:
ChannelHandler中的回调方法的执行顺序为:handlerAdded()→channelRegistered() → channelActive() → 入站方法回调 → channelInactive() →channelUnregistered()→ handlerRemoved()。其中,读数据的入站回调为:channelRead() →channelReadComplete();入站方法会多次调用,每一次有ByteBuf数据包入站都会调用到。
除了两个入站回调方法外,其余的6个方法都和ChannelHandler的生命周期有关,具体的介绍如下:
(1)handlerAdded() :当业务处理器被加入到流水线后,此方法被回调。也就是在完成ch.pipeline().addLast(handler)语句之后,会回调handlerAdded()。(2)channelRegistered():当通道成功绑定一个NioEventLoop线程后,会通过流水线回调所有业务处理器的channelRegistered()方法。
(3)channelActive():当通道激活成功后,会通过流水线回调所有业务处理器的channelActive()方法。通道激活成功指的是,所有的业务处理器添加、注册的异步任务完成,并且NioEventLoop线程绑定的异步任务完成。
(4)channelInactive():当通道的底层连接已经不是ESTABLISH状态,或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。(5)channelUnregistered():通道和NioEventLoop线程解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
(6)handlerRemoved():最后,Netty会移除掉通道上所有的业务处理器,并且回调所有的业务处理器的handlerRemoved()方法。
在上面的6个生命周期方法中,前面3个在通道创建的时候被先后回调,后面3个在通道关闭的时候会先后被回调。
(1)handlerAdded() :当业务处理器被加入到流水线后,此方法被回调。也就是在完成ch.pipeline().addLast(handler)语句之后,会回调handlerAdded()。(2)channelRegistered():当通道成功绑定一个NioEventLoop线程后,会通过流水线回调所有业务处理器的channelRegistered()方法。
(3)channelActive():当通道激活成功后,会通过流水线回调所有业务处理器的channelActive()方法。通道激活成功指的是,所有的业务处理器添加、注册的异步任务完成,并且NioEventLoop线程绑定的异步任务完成。
(4)channelInactive():当通道的底层连接已经不是ESTABLISH状态,或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。(5)channelUnregistered():通道和NioEventLoop线程解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
(6)handlerRemoved():最后,Netty会移除掉通道上所有的业务处理器,并且回调所有的业务处理器的handlerRemoved()方法。
在上面的6个生命周期方法中,前面3个在通道创建的时候被先后回调,后面3个在通道关闭的时候会先后被回调。
除了生命周期的回调,就是入站和出站处理的回调。对于Inhandler入站处理器,有两个很重要的回调方法为:
(1)channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
(2)channelReadComplete():流水线完成入站处理后,会从前向后,依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。
(1)channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
(2)channelReadComplete():流水线完成入站处理后,会从前向后,依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。
ChannelOutboundHandler通道出站处理器(几个重要方法):
1.bind监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务器端。
2.connect连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。
3.write写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。
4.flush腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。
5.read从底层读数据:完成Netty通道从Java IO通道的数据读取。
6.disConnect断开服务器连接:断开底层Java IO通道的服务器端连接。如果使用TCP传输协议,此方法主要用于客户端。7.close主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道。
2.connect连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。
3.write写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。
4.flush腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。
5.read从底层读数据:完成Netty通道从Java IO通道的数据读取。
6.disConnect断开服务器连接:断开底层Java IO通道的服务器端连接。如果使用TCP传输协议,此方法主要用于客户端。7.close主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道。
6.disConnect断开服务器连接:断开底层Java IO通道的服务器端连接。如果使用TCP传输协议,此方法主要用于客户端。7.close主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道。上面介绍的并不是ChannelOutboundHandler的全部方法,仅仅介绍了其中几个比较重要的方法。在Netty中,它的默认实现为ChannelOutboundHandlerAdapter,在实际开发中,只需要继承这个ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。
在Netty中,它的默认实现为ChannelOutboundHandlerAdapter,在实际开发中,只需要继承这个ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。
ChannelInitializer通道初始化处理器
ChannelInitializer也是通道初始化器,属于入站处理器的类型。在示例代码中,使用了ChannelInitializer的initChannel() 方法。它是何方神圣呢?
initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
详解Pipeline流水线
一条Netty通道需要很多的Handler业务处理器来处理业务。每条通道内部都有一条流水线(Pipeline)将Handler装配起来。Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。
Pipeline入站处理
按照inBoundHandler的加载顺序来依次执行inBoundChannel的方法,如先加载SimpleInBoundHandlerA,再加载SimpleInBoundHandlerB,就会先执行A的channelRead方法,再执行B的channelRread方法
调用inBoundHandler.channelXXX方法后,可以调用super.channelXXX()自动调用下一个inBoundHandler的方法,否则,操作流程会被截断.
调用inBoundHandler,channelXXX方法后,也可以通过调用ctx.fireChannelXXX方法,将入站处理传入到下一站。否则,操作流程会被截断。
调用inBoundHandler,channelXXX方法后,也可以通过调用ctx.fireChannelXXX方法,将入站处理传入到下一站。否则,操作流程会被截断。
Pipeline出站处理
outBoundHandler的处理顺序和加载顺序相反
调用outBoundHandler.xxx方法后,可以调用super.xxx()自动调用下一个outBoundHandler的方法,否则,操作流程会被截断.
调用outBoundHandler,xxx方法后,也可以通过调用ctx.xxx方法,将出站处理传入到下一站。否则,操作流程会被截断。
调用outBoundHandler,xxx方法后,也可以通过调用ctx.xxx方法,将出站处理传入到下一站。否则,操作流程会被截断。
ChannelHandlerContext上下文
在Handler业务处理器被添加到流水线中时,会创建一个通道处理器上下文ChannelHandlerContext,它代表了ChannelHandler通道处理器和ChannelPipeline通道流水线之间的关联。
ChannelHandlerContext中包含了有许多方法,主要可以分为两类:第一类是获取上下文所关联的Netty组件实例,如所关联的通道、所关联的流水线、上下文内部Handler业务处理器实例等;第二类是入站和出站处理方法。
在Channel、ChannelPipeline、ChannelHandlerContext三个类中,会有同样的出站和入站处理方法,同一个操作出现在不同的类中,功能有何不同呢?
如果通过Channel或ChannelPipeline的实例来调用这些方法,它们就会在整条流水线中传播。然而,如果是通过ChannelHandlerContext通道处理器上下文进行调用,就只会从当前的节点开始执行Handler业务处理器,并传播到同类型处理器的下一站(节点)。
思考:所以入站和出战,需要传递到下一个节点,可调用super.xxx方法或者ctx.xxx方法
如果通过Channel或ChannelPipeline的实例来调用这些方法,它们就会在整条流水线中传播。然而,如果是通过ChannelHandlerContext通道处理器上下文进行调用,就只会从当前的节点开始执行Handler业务处理器,并传播到同类型处理器的下一站(节点)。
思考:所以入站和出战,需要传递到下一个节点,可调用super.xxx方法或者ctx.xxx方法
Channel、Handler、ChannelHandlerContext三者的关系为:
Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中包裹了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站/出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实参,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。
Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中包裹了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站/出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实参,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。
截断流水线的处理
入站流水线截断:
(1)不调用supper.channelXxx(ChannelHandlerContext…)
(2)也不调用ctx.fireChannelXxx()
(1)不调用supper.channelXxx(ChannelHandlerContext…)
(2)也不调用ctx.fireChannelXxx()
出站流水线截断:
出站处理流程只要开始执行,就不能被截断。强行截断的话,Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。
出站处理流程只要开始执行,就不能被截断。强行截断的话,Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。
Handler业务处理器的热拔插
Netty中的处理器流水线是一个双向链表。在程序执行过程中,可以动态进行业务处理器的热拔插:动态地增加、删除流水线上的业务处理器Handler。主要的Handler热拔插方法声明在ChannelPipeline接口中
Netty中的例子:
Netty的通道初始化处理器——ChannelInitializer,在它的注册回调channelRegistered方法中,就使用了ctx.pipeline().remove(this),将自己从流水线中删除
Netty的通道初始化处理器——ChannelInitializer,在它的注册回调channelRegistered方法中,就使用了ctx.pipeline().remove(this),将自己从流水线中删除
详解ByteBuf缓冲区
属性:
· readerIndex(读指针):指示读取的起始位置。每读取一个字节,readerIndex自动增加1。一旦readerIndex与writerIndex相等,则表示ByteBuf不可读了。
· writerIndex(写指针):指示写入的起始位置。每写一个字节,writerIndex自动增加1。一旦增加到writerIndex与capacity()容量相等,则表示ByteBuf已经不可写了。capacity()是一个成员方法,不是一个成员属性,它表示ByteBuf中可以写入的容量。注意,它不是最大容量maxCapacity。
· maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity的值来设定,超过maxCapacity就会报错。
· readerIndex(读指针):指示读取的起始位置。每读取一个字节,readerIndex自动增加1。一旦readerIndex与writerIndex相等,则表示ByteBuf不可读了。
· writerIndex(写指针):指示写入的起始位置。每写一个字节,writerIndex自动增加1。一旦增加到writerIndex与capacity()容量相等,则表示ByteBuf已经不可写了。capacity()是一个成员方法,不是一个成员属性,它表示ByteBuf中可以写入的容量。注意,它不是最大容量maxCapacity。
· maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity的值来设定,超过maxCapacity就会报错。
三组方法
第一组:容量系列
· capacity():表示ByteBuf的容量,它的值是以下三部分之和:废弃的字节数、可读字节数和可写字节数。
· maxCapacity():表示ByteBuf最大能够容纳的最大字节数。当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到maxCapacity设定的上限。
· capacity():表示ByteBuf的容量,它的值是以下三部分之和:废弃的字节数、可读字节数和可写字节数。
· maxCapacity():表示ByteBuf最大能够容纳的最大字节数。当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到maxCapacity设定的上限。
第二组:写入系列
· isWritable() :表示ByteBuf是否可写。如果capacity()容量大于writerIndex指针的位置,则表示可写,否则为不可写。注意:如果isWritable()返回false,并不代表不能再往ByteBuf中写数据了。如果Netty发现往ByteBuf中写数据写不进去的话,会自动扩容ByteBuf。
· writableBytes() :取得可写入的字节数,它的值等于容量capacity()减去writerIndex。
· maxWritableBytes() :取得最大的可写字节数,它的值等于最大容量maxCapacity减去writerIndex。
· writeBytes(byte[] src) :把src字节数组中的数据全部写到ByteBuf。这是最为常用的一个方法。
· writeTYPE(TYPE value):写入基础数据类型的数据。TYPE表示基础数据类型,包含了8大基础数据类型。具体如下:writeByte()、 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble().
· setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值,包含了8大基础数据类型的设置。具体如下:setByte()、 setBoolean()、setChar()、setShort()、setInt()、setLong()、setFloat()、setDouble()。setType系列与writeTYPE系列的不同:setType系列不改变写指针writerIndex的值;writeTYPE系列会改变写指针writerIndex的值。
· markWriterIndex()与resetWriterIndex():这两个方法一起介绍。前一个方法表示把当前的写指针writerIndex属性的值保存在markedWriterIndex属性中;后一个方法表示把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中。markedWriterIndex属性相当于一个暂存属性,也定义在AbstractByteBuf抽象基类中。
· isWritable() :表示ByteBuf是否可写。如果capacity()容量大于writerIndex指针的位置,则表示可写,否则为不可写。注意:如果isWritable()返回false,并不代表不能再往ByteBuf中写数据了。如果Netty发现往ByteBuf中写数据写不进去的话,会自动扩容ByteBuf。
· writableBytes() :取得可写入的字节数,它的值等于容量capacity()减去writerIndex。
· maxWritableBytes() :取得最大的可写字节数,它的值等于最大容量maxCapacity减去writerIndex。
· writeBytes(byte[] src) :把src字节数组中的数据全部写到ByteBuf。这是最为常用的一个方法。
· writeTYPE(TYPE value):写入基础数据类型的数据。TYPE表示基础数据类型,包含了8大基础数据类型。具体如下:writeByte()、 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble().
· setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值,包含了8大基础数据类型的设置。具体如下:setByte()、 setBoolean()、setChar()、setShort()、setInt()、setLong()、setFloat()、setDouble()。setType系列与writeTYPE系列的不同:setType系列不改变写指针writerIndex的值;writeTYPE系列会改变写指针writerIndex的值。
· markWriterIndex()与resetWriterIndex():这两个方法一起介绍。前一个方法表示把当前的写指针writerIndex属性的值保存在markedWriterIndex属性中;后一个方法表示把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中。markedWriterIndex属性相当于一个暂存属性,也定义在AbstractByteBuf抽象基类中。
第三组:读取系列
· isReadable( ) :返回ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的值,则表示可读,否则为不可读。
· readableBytes( ) :返回表示ByteBuf当前可读取的字节数,它的值等于writerIndex减去readerIndex。
· readBytes(byte[] dst):读取ByteBuf中的数据。将数据从ByteBuf读取到dst字节数组中,这里dst字节数组的大小,通常等于readableBytes()。这个方法也是最为常用的一个方法之一。
· readType():读取基础数据类型,可以读取8大基础数据类型。具体如下:readByte()、readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()。
· getTYPE(TYPE value):读取基础数据类型,并且不改变指针值。具体如下:getByte()、getBoolean()、getChar()、getShort()、getInt()、getLong()、getFloat()、getDouble()。getType系列与readTYPE系列的不同:getType系列不会改变读指针readerIndex的值;readTYPE系列会改变读指针readerIndex的值。
· markReaderIndex( )与resetReaderIndex( ) :这两个方法一起介绍。前一个方法表示把当前的读指针ReaderIndex保存在markedReaderIndex属性中。后一个方法表示把保存在markedReaderIndex属性的值恢复到读指针ReaderIndex中。markedReaderIndex属性定义在AbstractByteBuf抽象基类中。
· isReadable( ) :返回ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的值,则表示可读,否则为不可读。
· readableBytes( ) :返回表示ByteBuf当前可读取的字节数,它的值等于writerIndex减去readerIndex。
· readBytes(byte[] dst):读取ByteBuf中的数据。将数据从ByteBuf读取到dst字节数组中,这里dst字节数组的大小,通常等于readableBytes()。这个方法也是最为常用的一个方法之一。
· readType():读取基础数据类型,可以读取8大基础数据类型。具体如下:readByte()、readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()。
· getTYPE(TYPE value):读取基础数据类型,并且不改变指针值。具体如下:getByte()、getBoolean()、getChar()、getShort()、getInt()、getLong()、getFloat()、getDouble()。getType系列与readTYPE系列的不同:getType系列不会改变读指针readerIndex的值;readTYPE系列会改变读指针readerIndex的值。
· markReaderIndex( )与resetReaderIndex( ) :这两个方法一起介绍。前一个方法表示把当前的读指针ReaderIndex保存在markedReaderIndex属性中。后一个方法表示把保存在markedReaderIndex属性的值恢复到读指针ReaderIndex中。markedReaderIndex属性定义在AbstractByteBuf抽象基类中。
知识点:get方法不会影响readIndex
ByteBuf的引用计数
Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用“计数器”(一种GC算法)来标记对象是否“不可达”进而收回(注:GC是Garbage Collection的缩写,即Java中的垃圾回收机制), Netty也使用了这种手段来对ByteBuf的引用进行计数。Netty采用“计数器”来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地“发现”那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。
什么是Pooled(池化)的ByteBuf缓冲区呢?在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。大家都知道,频繁创建对象、内存分配、释放内存,系统的开销大、性能低,如何提升性能、提高Buffer实例的使用率呢?从Netty4版本开始,新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓存池中;当需要时,则重新从对象缓存池中取出,而不需要重新创建。
引用计数的大致规则如下:在默认情况下,当创建完一个ByteBuf时,它的引用为1;每次调用retain()方法,它的引用就加1;每次调用release()方法,就是将引用计数减1;如果引用为0,再次访问这个ByteBuf对象,将会抛出异常;如果引用为0,表示这个ByteBuf没有哪个进程引用它,它占用的内存需要回收。
为了确保引用计数不会混乱,在Netty的业务处理器开发过程中,应该坚持一个原则:retain和release方法应该结对使用。
如果retain和release这两个方法,一次都不调用呢?则在缓冲区使用完成后,调用一次release,就是释放一次。例如在Netty流水线上,中间所有的Handler业务处理器处理完ByteBuf之后直接传递给下一个,由最后一个Handler负责调用release来释放缓冲区的内存空间。
如果retain和release这两个方法,一次都不调用呢?则在缓冲区使用完成后,调用一次release,就是释放一次。例如在Netty流水线上,中间所有的Handler业务处理器处理完ByteBuf之后直接传递给下一个,由最后一个Handler负责调用release来释放缓冲区的内存空间。
当引用计数已经为0, Netty会进行ByteBuf的回收。分为两种情况:(1)Pooled池化的ByteBuf内存,回收方法是:放入可以重新分配的ByteBuf池子,等待下一次分配。(2)Unpooled未池化的ByteBuf缓冲区,回收分为两种情况:如果是堆(Heap)结构缓冲,会被JVM的垃圾回收机制回收;如果是Direct类型,调用本地方法释放外部内存(unsafe.freeMemory)。
ByteBuf的Allocator分配器
Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了ByteBufAllocator的两种实现:PoolByteBufAllocator和UnpooledByteBufAllocator。
PoolByteBufAllocator(池化ByteBuf分配器)将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小;这个池化分配器采用了jemalloc高效内存分配的策略,该策略被好几种现代操作系统所采用。
UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,它没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例;通过Java的垃圾回收机制回收。
PoolByteBufAllocator(池化ByteBuf分配器)将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小;这个池化分配器采用了jemalloc高效内存分配的策略,该策略被好几种现代操作系统所采用。
UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,它没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例;通过Java的垃圾回收机制回收。
在Netty中,默认的分配器为ByteBufAllocator.DEFAULT,可以通过Java系统参数(System Property)的选项io.netty.allocator.type进行配置,配置时使用字符串值:"unpooled", "pooled"。
不同的Netty版本,对于分配器的默认使用策略是不一样的。在Netty 4.0版本中,默认的分配器为UnpooledByteBufAllocator。而在Netty 4.1版本中,默认的分配器为PooledByteBufAllocator。现在PooledByteBufAllocator已经广泛使用了一段时间,并且有了增强的缓冲区泄漏追踪机制。因此,可以在Netty程序中设置启动器Bootstrap的时候,将PooledByteBufAllocator设置为默认的分配器。
不同的Netty版本,对于分配器的默认使用策略是不一样的。在Netty 4.0版本中,默认的分配器为UnpooledByteBufAllocator。而在Netty 4.1版本中,默认的分配器为PooledByteBufAllocator。现在PooledByteBufAllocator已经广泛使用了一段时间,并且有了增强的缓冲区泄漏追踪机制。因此,可以在Netty程序中设置启动器Bootstrap的时候,将PooledByteBufAllocator设置为默认的分配器。
ByteBuf缓冲区类型
堆缓冲区,Heap ByteBuf,内部数据为一个Java数组,存储在JVM的堆空间中,通过hasArray来判断是不是堆缓存区。
优点:未使用池化的情况下,能提供快速的分配和释放。
不足:写入底层传输通道之前,都会复制到直接缓冲区。
优点:未使用池化的情况下,能提供快速的分配和释放。
不足:写入底层传输通道之前,都会复制到直接缓冲区。
直接缓冲区,Direct ByteBuf,内部数据存储在操作系统的物理内存中。
优点:能获取超过JVM堆限制大小的内存空间;写入传输通道比堆缓冲区更快
不足:释放和分配空间昂贵(使用系统的方法);在Java中操作时需要先复制到堆上
优点:能获取超过JVM堆限制大小的内存空间;写入传输通道比堆缓冲区更快
不足:释放和分配空间昂贵(使用系统的方法);在Java中操作时需要先复制到堆上
· Direct Memory不属于Java堆内存,所分配的内存其实是调用操作系统malloc()函数来获得的;由Netty的本地内存堆Native堆进行管理。
· Direct Memory容量可通过-XX:MaxDirectMemorySize来指定,如果不指定,则默认与Java堆的最大值(-Xmx指定)一样。注意:并不是强制要求,有的JVM默认Direct Memory与-Xmx无直接关系。
· Direct Memory的使用避免了Java堆和Native堆之间来回复制数据。在某些应用场景中提高了性能。
· 在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer(直接缓冲区)的代价比较高昂,因此不宜使用Direct Buffer。也就是说,Direct Buffer尽量在池化分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下,就可以大幅度改善性能。
· 对Direct Buffer的读写比Heap Buffer快,但是它的创建和销毁比普通Heap Buffer慢。
· 在Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的Direct Buffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机(JVM)带来压力。注意一下垃圾回收的应用场景:(1)垃圾回收仅在Java堆被填满,以至于无法为新的堆分配请求提供服务时发生;(2)在Java应用程序中调用System.gc()函数来释放内存。
· Direct Memory容量可通过-XX:MaxDirectMemorySize来指定,如果不指定,则默认与Java堆的最大值(-Xmx指定)一样。注意:并不是强制要求,有的JVM默认Direct Memory与-Xmx无直接关系。
· Direct Memory的使用避免了Java堆和Native堆之间来回复制数据。在某些应用场景中提高了性能。
· 在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer(直接缓冲区)的代价比较高昂,因此不宜使用Direct Buffer。也就是说,Direct Buffer尽量在池化分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下,就可以大幅度改善性能。
· 对Direct Buffer的读写比Heap Buffer快,但是它的创建和销毁比普通Heap Buffer慢。
· 在Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的Direct Buffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机(JVM)带来压力。注意一下垃圾回收的应用场景:(1)垃圾回收仅在Java堆被填满,以至于无法为新的堆分配请求提供服务时发生;(2)在Java应用程序中调用System.gc()函数来释放内存。
CompositeBuffer,多个缓冲区的组合表示
优点:方便一次操作多个缓冲区实例
优点:方便一次操作多个缓冲区实例
Heap ByteBuf 和 Direct ByteBuf的不同:
· 创建的方法不同:Heap ByteBuf通过调用分配器的buffer()方法来创建;而Direct ByteBuf的创建,是通过调用分配器的directBuffer()方法。
· Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组;而Direct ByteBuf缓冲区不能读取内部数组。
· 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区;如果hasArray()返回值为true,则表示是Heap堆缓冲,否则就不是。
注意,如果hasArray()返回false,不一定代表缓冲区一定就是Direct ByteBuf直接缓冲区,也有可能是CompositeByteBuf缓冲区。
如果内部只存在一个实例,则CompositeByteBuf中的hasArray()方法,将返回这个唯一实例的hasArray()方法的值;如果有多个实例,CompositeByteBuf中的hasArray()方法返回false。
· Direct ByteBuf要读取缓冲数据进行业务处理,相对比较麻烦,需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存,然后进行其他的计算。
· 创建的方法不同:Heap ByteBuf通过调用分配器的buffer()方法来创建;而Direct ByteBuf的创建,是通过调用分配器的directBuffer()方法。
· Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组;而Direct ByteBuf缓冲区不能读取内部数组。
· 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区;如果hasArray()返回值为true,则表示是Heap堆缓冲,否则就不是。
注意,如果hasArray()返回false,不一定代表缓冲区一定就是Direct ByteBuf直接缓冲区,也有可能是CompositeByteBuf缓冲区。
如果内部只存在一个实例,则CompositeByteBuf中的hasArray()方法,将返回这个唯一实例的hasArray()方法的值;如果有多个实例,CompositeByteBuf中的hasArray()方法返回false。
· Direct ByteBuf要读取缓冲数据进行业务处理,相对比较麻烦,需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存,然后进行其他的计算。
ByteBuf 的自动释放
入站
方式一:TailHandler自动释放
Netty默认会在ChannelPipline通道流水线的最后添加一个TailHandler末尾处理器,它实现了默认的处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作。
如果自定义的InboundHandler入站处理器继承自ChannelInboundHandlerAdapter适配器,那么可以调用以下两种方法来释放ByteBuf内存:
(1)手动释放ByteBuf。具体的方式为调用byteBuf.release()。
(2)调用父类的入站方法将msg向后传递,依赖后面的处理器释放ByteBuf。具体的方式为调用基类的入站处理方法super.channelRead(ctx, msg)。
Netty默认会在ChannelPipline通道流水线的最后添加一个TailHandler末尾处理器,它实现了默认的处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作。
如果自定义的InboundHandler入站处理器继承自ChannelInboundHandlerAdapter适配器,那么可以调用以下两种方法来释放ByteBuf内存:
(1)手动释放ByteBuf。具体的方式为调用byteBuf.release()。
(2)调用父类的入站方法将msg向后传递,依赖后面的处理器释放ByteBuf。具体的方式为调用基类的入站处理方法super.channelRead(ctx, msg)。
方式二:SimpleChannelInboundHandler自动释放
如果Handler业务处理器需要截断流水线的处理流程,不将ByteBuf数据包送入后边的InboundHandler入站处理器,这时,流水线末端的TailHandler末尾处理器自动释放缓冲区的工作自然就失效了。在这种场景下,Handler业务处理器有两种选择:
· 手动释放ByteBuf实例。
· 继承SimpleChannelInboundHandler,利用它的自动释放功能。
如果Handler业务处理器需要截断流水线的处理流程,不将ByteBuf数据包送入后边的InboundHandler入站处理器,这时,流水线末端的TailHandler末尾处理器自动释放缓冲区的工作自然就失效了。在这种场景下,Handler业务处理器有两种选择:
· 手动释放ByteBuf实例。
· 继承SimpleChannelInboundHandler,利用它的自动释放功能。
出站
出站缓冲区的自动释放方式:HeadHandler自动释放。在出站处理流程中,申请分配到的ByteBuf主要是通过HeadHandler完成自动释放的。
出站处理用到的Bytebuf缓冲区,一般是要发送的消息,通常由Handler业务处理器所申请而分配的。例如,在write出站写入通道时,通过调用ctx.writeAndFlush(Bytebufmsg), Bytebuf缓冲区进入出站处理的流水线。在每一个出站Handler业务处理器中的处理完成后,最后数据包(或消息)会来到出站的最后一棒HeadHandler,在数据输出完成后,Bytebuf会被释放一次,如果计数器为零,将被彻底释放掉。
出站处理用到的Bytebuf缓冲区,一般是要发送的消息,通常由Handler业务处理器所申请而分配的。例如,在write出站写入通道时,通过调用ctx.writeAndFlush(Bytebufmsg), Bytebuf缓冲区进入出站处理的流水线。在每一个出站Handler业务处理器中的处理完成后,最后数据包(或消息)会来到出站的最后一棒HeadHandler,在数据输出完成后,Bytebuf会被释放一次,如果计数器为零,将被彻底释放掉。
ByteBuf浅层复制
ByteBuf的浅层复制分为两种,有切片(slice)浅层复制和整体(duplicate)浅层复制。
Decoder与Encoder重要组件
Decoder解码器
因为底层存在状态,所以都不能使用共享模式
因为底层存在状态,所以都不能使用共享模式
ByteToMessageDecoder
ByteBuf 转 POJO,需要在处理器自行解决
需记录读取数据状态,所以不能使用共享模式
需记录读取数据状态,所以不能使用共享模式
MessageToMessageDecoder
POJO 转 POJO
开箱即用的Netty内置Decoder
(1)固定长度数据包解码器——FixedLengthFrameDecoder
适用场景:每个接收到的数据包的长度,都是固定的,例如100个字节。
在这种场景下,只需要把这个解码器加到流水线中,它会把入站ByteBuf数据包拆分成一个个长度为100的数据包,然后发往下一个channelHandler入站处理器。补充说明一下:这里所指的一个数据包,在Netty中就是一个ByteBuf实例。注:数据帧(Frame),本书也通称为数据包。
适用场景:每个接收到的数据包的长度,都是固定的,例如100个字节。
在这种场景下,只需要把这个解码器加到流水线中,它会把入站ByteBuf数据包拆分成一个个长度为100的数据包,然后发往下一个channelHandler入站处理器。补充说明一下:这里所指的一个数据包,在Netty中就是一个ByteBuf实例。注:数据帧(Frame),本书也通称为数据包。
(2)行分割数据包解码器——LineBasedFrameDecoder
适用场景:每个ByteBuf数据包,使用换行符(或者回车换行符)作为数据包的边界分割符。即 "\n"或“\r\n”
如果每个接收到的数据包,都以换行符/回车换行符作为分隔。在这种场景下,只需要把这个解码器加到流水线中,Netty会使用换行分隔符,把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包,再发送到下一站。
适用场景:每个ByteBuf数据包,使用换行符(或者回车换行符)作为数据包的边界分割符。即 "\n"或“\r\n”
如果每个接收到的数据包,都以换行符/回车换行符作为分隔。在这种场景下,只需要把这个解码器加到流水线中,Netty会使用换行分隔符,把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包,再发送到下一站。
(3)自定义分隔符数据包解码器——DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder是LineBasedFrameDecoder按照行分割的通用版本。不同之处在于,这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么接收到的数据包,末尾必须带上对应的分隔符。
DelimiterBasedFrameDecoder是LineBasedFrameDecoder按照行分割的通用版本。不同之处在于,这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么接收到的数据包,末尾必须带上对应的分隔符。
(4)自定义长度数据包解码器——LengthFieldBasedFrameDecoder
这是一种基于灵活长度的解码器。在ByteBuf数据包中,加了一个长度字段,保存了原始数据包的长度。解码的时候,会按照这个长度进行原始数据包的提取。
这是一种基于灵活长度的解码器。在ByteBuf数据包中,加了一个长度字段,保存了原始数据包的长度。解码的时候,会按照这个长度进行原始数据包的提取。
Encoder编码器
0 条评论
下一页