Nio/Netty笔记
2023-05-15 14:49:29 40 举报
AI智能生成
Nio/Netty是一个高性能的网络编程框架,用于构建可扩展、高吞吐量和低延迟的服务器应用程序。它基于Java NIO(非阻塞I/O)和Netty库,提供了一套简单易用的API,使得开发者能够快速地实现复杂的网络通信逻辑。Nio/Netty的核心优势在于其事件驱动的设计,通过高效的I/O多路复用技术,实现了并发处理大量连接的能力。此外,Nio/Netty还提供了丰富的编解码器支持,可以轻松地处理各种数据格式,如文本、二进制和压缩数据。总之,Nio/Netty是Java网络编程领域的一大利器,值得学习和掌握。
作者其他创作
大纲/内容
IO模型
Java NIO
Java NIO三大核心组件
缓冲区(Buffer)
Buffer分类
ByteBuffer
DirectByteBuffer
直接内存缓冲区
HeapByteBuffer
堆内存缓冲区
MappedByteBuffer
专门用于内存映射的一种ByteBuffer类型
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
CharBuffer
ByteBuffer关键属性
capacity(容量)
缓冲区的容量。通过构造函数赋予,一旦设置,无法更改
也就是说,一旦写入对象超过了capacity容量,缓冲区就满了,不能再写入了。
limit(读写的限制)
定义
缓冲区的界限。位于limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量
position(读写位置)
定义
下一个读写位置的索引(类似PC)。缓冲区的位置不能为负,并且不能大于limit
position属性和缓冲区的读写模式有关,在不同模式下,position属性的值是不同的。当缓冲区进行读写的模式改变时,position会进行调整。
mark(标记)
记录当前position的值。position被改变后,可以通过调用reset() 方法恢复到mark的位置。
关键方法
put()
put()方法可以将一个数据放入到缓冲区中。
进行该操作后,postition的值会+1,指向下一个可以放入的位置。capacity = limit ,为缓冲区容量的值。
flip()
flip()方法会切换对缓冲区的操作模式,由写->读 / 读->写
进行该操作后
如果是写模式->读模式,position = 0 , limit 指向最后一个元素的下一个位置,capacity不变
如果是读->写,则恢复为put()方法中的值
get()
get()方法会读取缓冲区中的一个值
进行该操作后,position会+1,如果超过了limit则会抛出异常
注意:get(i)方法不会改变position的值
rewind()
该方法只能在读模式下使用
rewind()方法后,会恢复position、limit和capacity的值,变为进行get()前的值
clean()
clean()方法会将缓冲区中的各个属性恢复为最初的状态,position = 0, capacity = limit
此时缓冲区的数据依然存在,处于“被遗忘”状态,下次进行写操作时会覆盖这些数据
mark()
mark()方法会将postion的值保存到mark属性中
reset()
reset()方法会将position的值改为mark中保存的值
compact()
compact会把未读完的数据向前压缩,然后切换到写模式
数据前移后,原位置的值并未清零,写时会覆盖之前的值
clear() VS compact()
clear只是对position、limit、mark进行重置,而compact在对position进行设置,以及limit、mark进行重置的同时,还涉及到数据在内存中拷贝(会调用arraycopy)。所以compact比clear更耗性能。
但compact能保存你未读取的数据,将新数据追加到为读取的数据之后;而clear则不行,若你调用了clear,则未读取的数据就无法再读取到了。
ByteBuffer调试工具类
使用
调用例子
创建缓冲区,向缓冲区中写入数据,例如调用 channel.read(buffer)
调用 flip() 切换至读模式
flip会使得buffer中的limit变为position,position变为0
从 buffer 读取数据,例如调用 buffer.get()
调用 clear() 或者compact()切换至写模式
调用clear()方法时position=0,limit变为capacity
调用compact()方法时,会将缓冲区中的未读数据压缩到缓冲区前面
重复以上步骤
字符串与ByteBuffer的相互转换
例子1
编码:字符串调用getByte方法获得byte数组,将byte数组放入ByteBuffer中
解码:先调用ByteBuffer的flip方法,然后通过StandardCharsets的decoder方法解码
例子2
编码:通过StandardCharsets的encode方法获得ByteBuffer,此时获得的ByteBuffer为读模式,无需通过flip切换模式
解码:通过StandardCharsets的decoder方法解码
例子3
编码:字符串调用getByte()方法获得字节数组,将字节数组传给ByteBuffer的wrap()方法,通过该方法获得ByteBuffer。同样无需调用flip方法切换为读模式
解码:通过StandardCharsets的decoder方法解码
通道(Channel)
主要类型
FileChannel
文件通道,用于文件的数据读写
SocketChannel
套接字通道,用于socket套接字tcp连接的数据读写
ServerSocketChannel
3.ServerSocketChannel服务器嵌套字通道(或者服务器监听通道),允许我们监听tcp连接请求,为每个监听到的请求,创建一个SocketChannel套接字通道。
DatagramChannel
数据报通道,用于UDP协议的数据读写。
主要类型详解
FileChannel
工作模式
FileChannel为阻塞模式,不能设置为非阻塞模式,所以无法搭配Selector。
获取
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream
或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法
或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法
通过 FileInputStream 获取的 channel 只能读
通过 FileOutputStream 获取的 channel 只能写
通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
常用方法
读取
int read(ByteBuffer dst) 从Channel到中读取数据到ByteBuffer
read方法的返回值表示读到了多少字节,若读到了文件末尾则返回-1
所以可根据返回值判断是否读取完毕
while(channel.read(buffer) > 0) {
// 进行对应操作
}
// 进行对应操作
}
分散读取(Scatter )
long read(ByteBuffer[] dsts) 将Channel到中的数据“分散”到ByteBuffer[]
写入
int write(ByteBuffer src)将ByteBuffer 到中的数据写入到 Channel
因为channel也是有大小的,所以 write 方法并不能保证一次将 buffer
中的内容全部写入 channel。必须需要按照以下规则进行写入
中的内容全部写入 channel。必须需要按照以下规则进行写入
// 通过hasRemaining()方法查看缓冲区中是否还有数据未写入到通道中
while(buffer.hasRemaining()) {
channel.write(buffer);
}
while(buffer.hasRemaining()) {
channel.write(buffer);
}
聚集写入(Gathering )
long write(ByteBuffer[] srcs)将ByteBuffer[] 到中的数据“聚集”到 Channel
位置
long position() 返回此通道的文件位置
FileChannel position(long p) 设置此通道的文件位置
设置当前位置时,如果设置为文件的末尾,这时读取会返回 -1,这时写入,会追加内容
注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)
强制写入
void force(boolean metaData) 操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,而是等到缓存满了以后将所有数据一次性的写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘。
两个Channel传输数据
transferFrom()
从目标通道中去复制原通道数据
transferTo()
使用transferTo方法可以快速、高效地将一个channel中的数据传输到另一个channel中,但一次只能传输2G的内容
当传输的文件大于2G时,需要使用备注中方法进行多次传输
关闭
通道需要close,一般情况通过try-with-resource进行关闭,最好使用
以下方法获取strea以及channel,避免某些原因使得资源未被关闭
以下方法获取strea以及channel,避免某些原因使得资源未被关闭
大小
long size() 返回此通道的文件的当前大小
FileChannel truncate(long s) 将此通道的文件截取为给定大小
其他文件编程相关API
ServerSocketChannel与SocketChannel
在NIO中,设计网络连接的通道有两个,一个是SocketChannel负责连接传输,一个是ServerSocketChannel负责连接的监听
ServerSocketChannel应用于服务器端,而SocketChannel同时处于服务器端和客户端,换句话说,对应一个连接,两端都有一个负责传输的SocketChannel传输通道。
非阻塞情况下,与服务器的连接还有可能没有真正建立,socketChannel.connect方法就返回了,因此需要不断的自旋
需要通过finishConnect()方法判断是否连接到服务端
ServerSocketChannel监听套接字的accept()方法,来获取新连接的套接字通道。
DatagramChannel
SctpChannel与NioSctpServerChannel
Selector选择器
定义
选择器的使命时完成IO的多路复用,一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出状况)。选择器和通道的关系,是监控和被监控的
使用
创建Selector
通过调用 Selector.open() 方法创建一个 Selector。
Selector.open()源码
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
return SelectorProvider.provider().openSelector();
}
通道和选择器之间的关系,通过register(注册)的方式完成。调用通道的Channel.register(Selector sel, int ops)方法,可以将通道实例注册到一个选择器中。
一条通道若能被选择,必须继承SelectableChannel类
IO事件
类型
连接:SelectionKey.OP_CONNECT
某个SocketChannel通道,完成了和对端的握手连接,则处于连接就绪状态
客户端连接成功时触发
接收:SelectionKey.OP_ACCEPT
某个ServerSocketChannel服务器通道,监听到一个新连接的到来,则处于接收就绪状态
服务器端成功接受连接时触发
可读: SelectionKey.OP_READ
一个数据可读的SocketChannel通道,处于读就绪状态。
数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
可写:SelectionKey.OP_WRITE
一个等待写入数据的,处于写就绪状态
数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
注意
服务器监听通道ServerSocketChannel,仅仅支持Accept(接收到新连接)IO事件,而SocketChannel传输通道,则不支持Accept (接收到新连接) IO事件
SelectionKey选择键
通过select方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的感兴趣的那些IO事件
换句话说,一旦在通道中发生了某些IO事件(就绪状态达成),并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey选择键的集合中。
SelectionKey选择键就是那些被选择器选中的IO事件
处理socket连接的三种方式与对比
使用多线程技术
为每个连接分别开辟一个线程,分别去处理对应的socket连接
存在问题
内存占用高
每个线程都需要占用一定的内存,当连接较多时,会开辟大量线程,导致占用大量内存
线程上下文切换成本高
只适合连接数少的场景
连接数过多,会导致创建很多线程,从而出现问题
使用线程池技术
阻塞模式下,线程仅能处理一个连接
线程池中的线程获取任务(task)后,只有当其执行完任务之后(断开连接后),才会去获取并执行下一个任务
若socke连接一直未断开,则其对应的线程无法处理其他socket连接
仅适合短连接场景
短连接即建立连接发送请求并响应后就立即断开,使得线程池中的线程可以快速处理其他连接
使用选择器
selector 的作用就是配合一个线程来管理多个 channel(fileChannel因为是阻塞式的,所以无法使用selector),获取这些 channel 上发生的事件。
这些 channel 工作在非阻塞模式下,当一个channel中没有执行任务时,可以去执行其他channel中的任务。
适合连接数多,但流量较少的场景。
若事件未就绪,调用 selector 的 select() 方法会阻塞线程,直到 channel 发生了就绪事件。这些事件就绪后,select 方法就会返回这些事件交给 thread 来处理。
NIO中的粘包和半包问题
现象
网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I’m Nyima\n
How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
Hello,world\nI’m Nyima\nHo
w are you?\n
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
Hello,world\n
I’m Nyima\n
How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
Hello,world\nI’m Nyima\nHo
w are you?\n
出现原因
粘包
发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去。
半包
接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象
解决办法
利用分割符确定数据开始和结束的位置,分别进行处理。
代码
网络编程
Netty依赖
通道(Channel)和缓冲区(Buffer)
通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。
然后操作缓冲区,对数据进行处理
然后操作缓冲区,对数据进行处理
通道负责传输,缓冲区负责存储
netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
没引入Selector之前
阻塞
阻塞模式下,有些方法会导致线程暂停。阻塞的表现其实就是
线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置。
线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置。
ServerSocketChannel.accept 会在没有连接建立时让线程暂停
SocketChannel.read 会在通道中没有数据可读时让线程暂停
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
但多线程下,有新的问题
32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
测试代码
服务端代码
客户端代码
debug步骤
只启动服务端
客户端-服务器建立连接前:服务器端因accept阻塞
给客户端添加断点,启动客户端
客户端-服务器建立连接后,客户端发送消息前:服务器端因通道为空被阻塞
在客户端(evaluate)发送消息
客户端发送数据后,服务器处理通道中的数据。再次进入循环时,再次被accept阻塞
同一个客户端再次(evaluate)发送消息
之前的客户端再次发送消息,服务器端因为被accept阻塞,无法处理之前客户端发送到通道中的信息
非阻塞
可以通过ServerSocketChannel的configureBlocking(false)方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null
可以通过SocketChannel的configureBlocking(false)方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1
测试代码
这样写存在一个问题,因为设置为了非阻塞,会一直执行while(true)中的代码,CPU
一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求。
一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求。
引入Selector之后
多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用
优点
如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
有可连接事件时才去连接
有可读事件才去读取
有可写事件才去写入
限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
优化代码
步骤解析
获得选择器Selector
Selector selector = Selector.open();
将通道设置为非阻塞模式,并注册到选择器中,并设置感兴趣的事件
channel 必须工作在非阻塞模式
FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
绑定事件类型
通过Selector监听事件,并获得就绪的通道个数,若没有通道就绪,线程会被阻塞
阻塞直到绑定事件发生
int count = selector.select();
阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count = selector.selectNow();
获取就绪事件并得到对应的通道,然后进行处理
// 获取所有事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 使用迭代器遍历事件
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 判断key的类型,此处为Accept类型
if(key.isAcceptable()) {
// 获得key对应的channel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 获取连接并处理,而且是必须处理,否则需要取消
SocketChannel socketChannel = channel.accept();
// 处理完毕后移除
iterator.remove();
}
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 使用迭代器遍历事件
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 判断key的类型,此处为Accept类型
if(key.isAcceptable()) {
// 获得key对应的channel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 获取连接并处理,而且是必须处理,否则需要取消
SocketChannel socketChannel = channel.accept();
// 处理完毕后移除
iterator.remove();
}
}
事件发生后能否不处理?
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发。
各种事件的处理
Read事件
在Accept事件中,若有客户端与服务器端建立了连接,需要将其对应的SocketChannel设置为非阻塞,并注册到选择其中
添加Read事件,触发后进行读取操作
删除事件
当处理完一个事件后,一定要调用迭代器的remove方法移除对应事件,否则会导致已被处理过的事件再次被处理,就会引发错误。
代码说明
断开处理
当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,对异常断开和正常断开需要加以不同的方式进行处理
正常断开
正常断开时,服务器端的channel.read(buffer)方法的返回值为-1,所以当结束到返回值为-1时,
需要调用key的cancel方法取消此事件,并在取消后移除该事件。
需要调用key的cancel方法取消此事件,并在取消后移除该事件。
异常断开
异常断开时,会抛出IOException异常, 在try-catch的catch块中捕获异常并调用key的cancel方法即可
消息边界
不处理消息边界存在的问题
将缓冲区的大小设置为4个字节,发送2个汉字(你好),通过decode解码并打印时,会出现乱码
ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));
这是因为UTF-8字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好 的 好 字被拆分为了前半部分和后半部分发送,解码时就会出现问题。
处理消息边界
传输的文本可能有三种情况
文本大于缓冲区大小,此时需要将缓冲区进行扩容
发生半包现象
发生粘包现象
解决思路
固定消息长度,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽。
另一种思路是按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符
TLV 格式,即 Type 类型、Length 长度、Value 数据(也就是在消息开头用一些空间存放后面数据的长度),如HTTP请求头中的Content-Type与Content-Length。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量。
Http 1.1 是 TLV 格式
Http 2.0 是 LTV 格式
附件与扩容
Channel的register方法还有第三个参数:附件,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey绑定,可以从SelectionKey获取到对应通道的附件
public final SelectionKey register(Selector sel, int ops, Object att)
可通过SelectionKey的attachment()方法获得附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
我们需要在Accept事件发生后,将通道注册到Selector中时,对每个通道添加一个ByteBuffer
附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题。
附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题。
// 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 添加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 添加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
当Channel中的数据大于缓冲区时,需要对缓冲区进行扩容操作。此代码中的扩容的判定方法:Channel调用compact方法后,的position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中。
// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
// 将旧buffer中的内容放入新的buffer中
ewBuffer.put(buffer);
// 将新buffer作为附件放到key中
key.attach(newBuffer);
}
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
// 将旧buffer中的内容放入新的buffer中
ewBuffer.put(buffer);
// 将新buffer作为附件放到key中
key.attach(newBuffer);
}
ByteBuffer的大小分配
每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
分配思路可以参考
一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
Write事件
服务器通过Buffer向通道中写入数据时,可能因为通道容量小于Buffer中的数据大小,导致
无法一次性将Buffer中的数据全部写入到Channel中,这时便需要分多次写入,具体步骤如下:
无法一次性将Buffer中的数据全部写入到Channel中,这时便需要分多次写入,具体步骤如下:
1. 执行一次写操作,向将buffer中的内容写入到SocketChannel中,然后判断Buffer中是否还有数据
2. 若Buffer中还有数据,则需要将SocketChannel注册到Seletor中,并
关注写事件,同时将未写完的Buffer作为附件一起放入到SelectionKey中
关注写事件,同时将未写完的Buffer作为附件一起放入到SelectionKey中
int write = socket.write(buffer);
// 通道中可能无法放入缓冲区中的所有数据
if (buffer.hasRemaining()) {
// 注册到Selector中,关注可写事件,并将buffer添加到key的附件中
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_WRITE, buffer);
}
// 通道中可能无法放入缓冲区中的所有数据
if (buffer.hasRemaining()) {
// 注册到Selector中,关注可写事件,并将buffer添加到key的附件中
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_WRITE, buffer);
}
3. 添加写事件的相关操作key.isWritable(),对Buffer再次进行写操作
每次写后需要判断Buffer中是否还有数据(是否写完)。若写完,需要移除SelecionKey
中的Buffer附件,避免其占用过多内存,同时还需移除对写事件的关注。
中的Buffer附件,避免其占用过多内存,同时还需移除对写事件的关注。
SocketChannel socket = (SocketChannel) key.channel();
// 获得buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 执行写操作
int write = socket.write(buffer);
System.out.println(write);
// 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(0);
}
// 获得buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 执行写操作
int write = socket.write(buffer);
System.out.println(write);
// 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(0);
}
优化
多线程优化
充分利用多核CPU,分两组选择器
单线程配一个选择器(Boss),专门处理 accept 事件
创建 cpu 核心数的线程(Worker),每个线程配一个选择器,轮流处理 read 事件
实现思路
创建一个负责处理Accept事件的Boss线程,与多个负责处理Read事件的Worker线程
Boss线程执行的操作
接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,
让Worker去处理Read事件,其中需要根据标识robin去判断将任务分配给哪个Worker
让Worker去处理Read事件,其中需要根据标识robin去判断将任务分配给哪个Worker
// 创建固定数量的Worker
Worker[] workers = new Worker[4];
// 用于负载均衡的原子整数
AtomicInteger robin = new AtomicInteger(0);
// 负载均衡,轮询分配Worker
workers[robin.getAndIncrement()% workers.length].register(socket);
Worker[] workers = new Worker[4];
// 用于负载均衡的原子整数
AtomicInteger robin = new AtomicInteger(0);
// 负载均衡,轮询分配Worker
workers[robin.getAndIncrement()% workers.length].register(socket);
register(SocketChannel socket)方法会通过同步队列完成Boss线程与Worker线程之间的通信,让SocketChannel
的注册任务被Worker线程执行。添加任务后需要调用selector.wakeup()来唤醒被阻塞的Selector
的注册任务被Worker线程执行。添加任务后需要调用selector.wakeup()来唤醒被阻塞的Selector
public void register(final SocketChannel socket) throws IOException {
// 只启动一次
if (!started) {
// 初始化操作
}
// 向同步队列中添加SocketChannel的注册事件
// 在Worker线程中执行注册事件
queue.add(new Runnable() {
@Override
public void run() {
try {
socket.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 唤醒被阻塞的Selector
// select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
selector.wakeup();
}
// 只启动一次
if (!started) {
// 初始化操作
}
// 向同步队列中添加SocketChannel的注册事件
// 在Worker线程中执行注册事件
queue.add(new Runnable() {
@Override
public void run() {
try {
socket.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 唤醒被阻塞的Selector
// select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
selector.wakeup();
}
Worker线程执行的操作
从同步队列中获取注册任务,并处理Read事件
实现代码
服务端
客户端
代码分析
我们调试代码的时候,如果注释掉selector.wakeup();这段代码,可以观察到,Worker中selector.select();这段代码如果先执行,整个程序会阻塞,无法执行到socket.register(selector, SelectionKey.OP_READ);这个代码。所以这时候需要selector.wakeup();去唤醒selector。
NIO与BIO
零拷贝
传统I/O
代码示例
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
过程详解
Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU。
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
图解DMA
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝
接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU。
用户态与内核态的切换
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
用户态与内核态的切换发生了 3 次,这个操作比较重量级
数据拷贝了共 4 次
NIO优化
方法
ByteBuffer.allocate(10)
底层对应 HeapByteBuffer,使用的还是 Java 内存
ByteBuffer.allocateDirect(10)
底层对应DirectByteBuffer,使用的是操作系统内存
优化点
Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用
内存回收过程
这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
当引用的对象ByteBuffer被垃圾回收以后,虚引用对象Cleaner就会被放入引用队列中,然后调用Cleaner的clean方法来释放直接内存
DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法
通过专门线程访问引用队列,根据虚引用释放堆外内存
减少了一次数据拷贝,用户态与内核态的切换次数没有减少
零拷贝
linux 2.1以后
详情
底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
过程详解
Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU
方法
只发生了1次用户态与内核态的切换
数据拷贝了 3 次
linux 2.4以后
过程详解(linux 2.4)
Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU
用户态与内核态切换
整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次
定义
零拷贝指的是数据无需拷贝到 JVM 内存中
更少的用户态与内核态的切换
不利用 cpu 计算,减少 cpu 缓存伪共享
零拷贝适合小文件传输
定义
零拷贝指的是数据无需拷贝到 JVM 内存中
更少的用户态与内核态的切换
不利用 cpu 计算,减少 cpu 缓存伪共享
零拷贝适合小文件传输
底层原理分析
AIO
AIO 用来解决数据复制阶段的阻塞问题
同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
Windows 系统通过 IOCP 实现了真正的异步 IO
Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
IO模型
分类
同步
线程自己去获取结果(一个线程)
异步
线程自己不去获取结果,而是由其它线程返回结果(至少两个线程)
数据读取的两个阶段
等待数据阶段
复制数据阶段
read系统调用
read系统调用过程
IO模型
阻塞IO
用户线程进行read操作时,需要等待操作系统执行实际的read操作,此期间用户线程是被阻塞的,无法执行其他操作
非阻塞IO
用户线程在一个循环中一直调用read方法,若内核空间中还没有数据可读,立即返回
只是在等待阶段非阻塞
用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果
多路复用
Java中通过Selector实现多路复用
当没有事件时,调用select方法会被阻塞住
一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用
多路复用与阻塞IO的区别
阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后,才能去处理read事件
多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行
异步IO
线程1调用方法后立即返回,不会被阻塞也不需要立即获取结果
当方法的运行结果出来以后,由线程2将结果返回给线程1
Netty入门
什么是Netty
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
Netty的优势
如果使用传统NIO,其工作量大,bug 多
需要自己构建协议
解决 TCP 传输问题,如粘包、半包
因为bug的存在,epoll 空轮询导致 CPU 100%
Netty 对 API 进行增强,使之更易用,如
FastThreadLocal => ThreadLocal
ByteBuf => ByteBuffer
入门案例
服务端
客户端
Netty关键组件
EventLoop
事件循环对象 EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
它的继承关系如下
继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
继承自 netty 自己的 OrderedEventExecutor
提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup
事件循环组 EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup
实现了 Iterable 接口提供遍历 EventLoop 的能力
另有 next 方法获取集合中下一个 EventLoop 处理
关闭 EventLoopGroup
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。
处理普通与定时任务
处理IO任务
服务器
客户端
增加自定义EventLoopGroup
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。
不同的EventLoopGroup切换handler的实现原理
当handler中绑定的Group不同时,需要切换Group来执行不同的任务
如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用
源码
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
Channel
常用方法
close() 可以用来关闭Channel
closeFuture() 用来处理 Channel 的关闭
sync 方法作用是同步等待 Channel 关闭
addListener 方法是异步等待 Channel 关闭
pipeline() 方法用于添加处理器
write() 方法将数据写入
因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
writeAndFlush() 方法将数据写入并立即发送(刷出)
ChannelFuture
连接问题
拆分客户端代码
问题描述
如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world
问题分析
这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端
解决方法
所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
addListener方法
通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作
关闭channel
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择两种方法来实现
通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO线程执行完close操作
closeFuture.sync();
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO线程执行完close操作
closeFuture.sync();
调用closeFuture.addListener方法,添加close的后续操作
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});
Future与Promise
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
示例代码
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
示例代码
Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
可以通过Future对象的get方法,阻塞地获取返回结果
也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
Handler与Pipeline
通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler
pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
具体结构
当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
调用顺序
OutboundHandler触发时机
socketChannel.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找
ctx.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
EmbeddedChannel
EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可
ByteBuf
调试工具方法
创建过程
ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小。
PooledUnsafeDirectByteBuf
当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建。
直接内存与堆内存
创建基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
池化与非池化
池化的最大意义在于可以重用 ByteBuf,优点有
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
4.1 之前,池化功能还不成熟,默认是非池化实现
4.1 之前,池化功能还不成熟,默认是非池化实现
组成
组成部分
读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制。进行读写操作时,无需进行模式的切换
读指针前的部分被称为废弃部分,是已经读过的内容
读指针与写指针之间的空间称为可读部分
写指针与当前容量之间的空间称为可写部分
最大容量与当前容量
在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
常用方法
注意
这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
使用方法
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
buffer.writeInt(5);
ByteBufUtil.log(buffer);
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);
// 下面开始扩容
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
buffer.writeInt(5);
ByteBufUtil.log(buffer);
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);
// 下面开始扩容
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}
扩容
当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作
扩容规则
如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
如果写入后数据大小超过 512 字节,则选择下一个 2的n次方
例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 已经不够了)
扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常
Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)
...
...
读取
读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针
如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置
还有以 get 开头的一系列方法,这些方法不会改变读指针的位置
释放
由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
每个 ByteBuf 对象的初始计数为 1
调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read
方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
while (!buffer.release()) {}
当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中
TailConext中释放ByteBuf的源码
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具体的释放方法
ReferenceCountUtil.release(msg);
}
}
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具体的释放方法
ReferenceCountUtil.release(msg);
}
}
判断传过来的是否为ByteBuf,是的话才需要释放
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
切片
ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针。
得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用。
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
优势
池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
读写指针分离,不需要像 ByteBuffer 一样切换读写模式
可以自动扩容
支持链式调用,使用更流畅
很多地方体现零拷贝,例如
slice、duplicate、CompositeByteBuf
应用
粘包与半包
粘包与半包
服务器代码
粘包现象
客户端代码
可见虽然客户端是分别以16字节为单位,通过channel向服务器发送了10次数据,可是服务器端却只接收了一次,接收数据的大小为160B,即客户端发送的数据总大小,这就是粘包现象。
半包现象
将客户端-服务器之间的channel容量进行调整
// 调整channel的容量
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍。
现象分析
粘包
现象
发送 abc def,接收 abcdef
原因
应用层
接收方 ByteBuf 设置太大(Netty 默认 1024)
传输层-网络层
滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包。
Nagle 算法:会造成粘包
半包
现象
发送 abcdef,接收 abc def
原因
应用层
接收方 ByteBuf 小于实际发送数据量
传输层-网络层
滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包。
本质
发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界
解决方案
短链接
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象。
客户端代码改进
修改channelActive方法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 使用短链接,每次发送完毕后就断开连接
ctx.channel().close();
}
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 使用短链接,每次发送完毕后就断开连接
ctx.channel().close();
}
将发送步骤整体封装为send()方法,调用10次send()方法,模拟发送10次数据
public static void main(String[] args) {
// 发送10次
for (int i = 0; i < 10; i++) {
send();
}
}
// 发送10次
for (int i = 0; i < 10; i++) {
send();
}
}
客户端先于服务器建立连接,此时控制台打印ACTIVE,之后客户端向服务器发送了16B的数据,发送后断开连接,此时控制台打印INACTIVE,可见未出现粘包现象。
定长解码器
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码,具体使用方法如下:ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
服务器代码
使用FixedLengthFrameDecoder对粘包数据进行拆分,该handler需要添加在LoggingHandler之前,保证数据被打印时已被拆分
// 通过定长解码器对粘包数据进行拆分
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
行解码器
行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的
可以通过LineBasedFrameDecoder(int maxLength)来拆分以换行符(\n)为分隔符的数据,也可以通过DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)来指定通过什么分隔符来拆分数据(可以传入多个分隔符)
两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出TooLongFrameException异常
以换行符 \n 为分隔符
长度字段解码器
定义
在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的。
参数浅析
LengthFieldBasedFrameDecoder解码器可以提供更为丰富的拆分方法,其构造方法有五个参数
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
参数解析
maxFrameLength 数据最大长度
表示数据的最大长度(包括附加信息、长度标识等内容)
lengthFieldOffset 数据长度标识的起始偏移量
用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
lengthFieldLength 数据长度标识所占字节数(用于指明有用数据的长度)
数据中用于表示有用数据长度的标识所占的字节数
lengthAdjustment 长度表示与有用数据的偏移量
用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
initialBytesToStrip 数据读取起点
读取起点,不读取 0 ~ initialBytesToStrip 之间的数据
参数详解
从0开始即为长度标识,长度标识长度为2个字节
0x000C 即为后面 HELLO, WORLD的长度
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |-----> | Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |-----> | Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
从0开始即为长度标识,长度标识长度为2个字节,读取时从第二个字节开始读取(此处即跳过长度标识)
因为跳过了用于表示长度的2个字节,所以此处直接读取HELLO, WORLD
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |-----> | Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |-----> | Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
长度标识前面还有2个字节的其他内容(0xCAFE),第三个字节开始才是长度标识,长度表示长度为3个字节(0x00000C)
Header1中有附加信息,读取长度标识时需要跳过这些附加信息来获取长度
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |-----> | Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |-----> | Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
从0开始即为长度标识,长度标识长度为3个字节,长度标识之后还有2个字节的其他内容(0xCAFE)
长度标识(0x00000C)表示的是从其后lengthAdjustment(2个字节)开始的数据的长度,即HELLO, WORLD,不包括0xCAFE
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |-----> | Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |-----> | Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
长度标识前面有1个字节的其他内容,后面也有1个字节的其他内容,读取时从长度标识之后3个字节处开始读取,即读取 0xFE HELLO, WORLD
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |-----> | HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |-----> | HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
使用
代码
协议设计与解析
协议的作用
TCP/IP 中消息传输基于流的方式,没有边界
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
Redis协议
如果我们要向Redis服务器发送一条set name Nyima的指令,需要遵守如下协议
// 该指令一共有3部分,每条指令之后都要添加回车与换行符
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n
代码
HTTP协议
HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求
// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
代码
服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可
// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH而一直空转,需要添加CONTENT_LENGTH字段,表明响应体中数据的具体长度
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
自定义协议
组成要素
魔数
用来在第一时间判定接收的数据是否为无效数据包
版本号
可以支持协议的升级
序列化算法
消息正文到底采用哪种序列化反序列化方式
如:json、protobuf、hessian、jdk
指令类型
是登录、注册、单聊、群聊… 跟业务相关
请求序号
为了双工通信,提供异步能力
正文长度
消息正文
编码器与解码器
编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息。
编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中
解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler
代码
MessageCode
LoginRequestMessage
Message
TestCodec
测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码
@Sharable注解
为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder。
如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题
如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题
此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中
的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误
的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误
channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable注解来标明,该handler能否在多个channel中共享。
只有带有该注解,才能通过对象的方式被共享,否则无法被共享
自定义编解码器能否使用@Sharable注解?
这需要根据自定义的handler的处理逻辑进行分析
我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的
但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下
这就意味着ByteToMessageCodec不能被多个channel所共享的
原因:因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题
如果想要共享,需要怎么办呢?
继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似
0 条评论
下一页