Netty 入门与实战:仿写微信 IM 即时通讯系统
2019-01-26 14:22:07 27 举报
AI智能生成
netty
作者其他创作
大纲/内容
仿微信 IM 系统简介
netty实战实现简单的命令行形式的单聊和群聊
Netty 是什么?
IO编程
关键示例
IOServer.java
ServerSocket serverSocket = new ServerSocket(8000)
InputStream inputStream = socket.getInputStream()
Server 端首先创建了一个serverSocket来监听 8000 端口,然后创建一个线程,线程里面不断调用阻塞方法 serversocket.accept();
IOClient.java
Socket socket = new Socket("127.0.0.1", 8000);
socket.getOutputStream().write((new Date() + ": hello world").getBytes());
系统庞大后存在的问题
线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
线程切换效率低下:单机 CPU 核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
IO 编程中,我们看到数据读写是以字节流为单位。
NIO 编程
如何解决 NIO 问题的
线程资源受限:NIO 模型中 selector模式
线程切换效率低下:由于 NIO 模型中线程数量大大降低,线程切换效率因此也大幅度提高
IO 读写是面向流的,一次性只能从流中读取一个或者多个字节,并且读完之后流无法再读取,你需要自己缓存数据。 而 NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。
总结
Netty 封装了 JDK 的 NIO,让你用得更爽,你不用再写一大堆复杂的代码了。 用官方正式的话来说就是:Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
启动流程
服务端启动流程
创建两个NioEventLoopGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup表示监听端口,accept 新连接的线程组,workerGroup表示处理每一条连接的数据读写的线程组
创建了一个引导类 ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
.group(bossGroup, workerGroup)给引导类配置两大线程组
.channel(NioServerSocketChannel.class)来指定 IO 模型
调用childHandler()方法,给这个引导类创建一个ChannelInitializer,后续增加业务逻辑的入口
绑定端口serverBootstrap.bind(8000);
客户端启动流程
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
指定线程模型
.group(workerGroup)
.group(workerGroup)
指定 IO 类型为 NIO
.channel(NioSocketChannel.class)
.channel(NioSocketChannel.class)
IO 处理逻辑
.handler(new ChannelInitializer<SocketChannel>() {}
.handler(new ChannelInitializer<SocketChannel>() {}
建立连接
bootstrap.connect("juejin.im", 80)
bootstrap.connect("juejin.im", 80)
数据传输载体 ByteBuf
基于读写指针和容量、最大可扩容容量,衍生出一系列的读写方法,要注意 read/write 与 get/set 的区别
多个 ByteBuf 可以引用同一段内存,通过引用计数来控制内存的释放,遵循谁 retain() 谁 release() 的原则
客户端与服务端通信协议编解码
通信协议的设计
服务端与客户端交互,双方协商出来的满足一定规则的二进制数据格式
编码
Java 对象根据协议封装成二进制数据包的过程成为编码
public ByteBuf encode(Packet packet) {
// ....
return byteBuf;
}
public ByteBuf encode(Packet packet) {
// ....
return byteBuf;
}
// 1. 创建 ByteBuf 对象
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer()
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer()
这里我们调用 Netty 的 ByteBuf 分配器来创建,ioBuffer() 方法会返回适配 io 读写相关的内存,它会尽可能创建一个直接内存,直接内存可以理解为不受 jvm 堆管理的内存空间,写到 IO 缓冲区的效果更高
// 2. 序列化 Java 对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
byte[] bytes = Serializer.DEFAULT.serialize(packet);
// 3. 实际编码过程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
解码
二进制数据包中解析出 Java 对象的过程成为解码
public Packet decode(ByteBuf byteBuf) {
// 跳过 magic number
byteBuf.skipBytes(4);
// 跳过版本号
byteBuf.skipBytes(1);
// 序列化算法标识
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 数据包长度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}
return null;
}
// 跳过 magic number
byteBuf.skipBytes(4);
// 跳过版本号
byteBuf.skipBytes(1);
// 序列化算法标识
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 数据包长度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}
return null;
}
pipeline 与 channelHandler
pipeline
https://www.jianshu.com/p/6efa9c5fa702
netty在服务端端口绑定和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出
channelHandler
就是在 pipeline 管道里面流动的元素
类
ChannelInboundHandler
ChannelOutBoundHandler
ChannelOutBoundHandler
inBoundHandler 的执行顺序与我们实际的添加顺序相同,而 outBoundHandler 则相反。
ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
接收上一个 handler 的输出,这里的 msg 就是上一个 handler 的输出,msg 对象其实就是 ByteBuf
默认情况下 adapter 会通过 fireChannelRead() 方法直接把上一个 handler 的输出结果传递到下一个 handler
ChannelOutboundHandlerAdapter
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
这个 adapter 也会把对象传递到下一个 outBound 节点,它的传播顺序与 inboundHandler 相反
特殊的 handler
ByteToMessageDecoder
无论我们是在客户端还是服务端,当我们收到数据之后,首先要做的事情就是把二进制数据转换到我们的一个 Java 对象,所以 Netty 很贴心地写了一个父类,来专门做这个事情
自动管理ByteBuf申请的堆外存
SimpleChannelInboundHandler
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
// 业务逻辑
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
// 业务逻辑
}
}
类型的转换都自动处理完成,不需要手动编码
MessageToByteEncoder
基于 MessageToByteEncoder,我们可以实现自定义编码,而不用关心 ByteBuf 的创建,不用每次向对端写 Java 对象都进行一次编码
生命周期
handlerAdded() :指的是当检测到新连接之后,调用 ch.pipeline().addLast(new LifeCyCleTestHandler()); 之后的回调,表示在当前的 channel 中,已经成功添加了一个 handler 处理器。
channelRegistered():这个回调方法,表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系
channelActive():当 channel 的所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经添加完所有的 handler)以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法
channelRead():客户端向服务端发来数据,每次都会回调此方法,表示有数据可读
channelReadComplete():服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕
拆包粘包理论与解决方案
为什么会有这样子的现象
TCP 底层和Netty应用层 不对等的关系
拆包原理
如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包
如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
自带拆包器
固定长度的拆包器 FixedLengthFrameDecoder
行拆包器 LineBasedFrameDecoder
分隔符拆包器 DelimiterBasedFrameDecoder
基于长度域拆包器 LengthFieldBasedFrameDecoder
参考https://www.jianshu.com/p/a0a51fd79f62
心跳与空闲检测
在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开
两大问题
对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序奔溃
对于客户端来说,连接假死会造成发送数据超时,影响用户体验
连接假死原因
应用程序出现线程堵塞,无法进行数据的读写
客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障
公网丢包。
服务端:何为空闲检测?
空闲检测指的是每隔一段时间,检测这段时间内是否有数据读写,简化一下,我们的服务端只需要检测一段时间内,是否收到过客户端发来的数据即可
IdleStateHandler
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接");
ctx.channel().close();
}
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接");
ctx.channel().close();
}
客户端:定时发心跳
服务端没收到消息可能存在假死现象,也有可能客户端确实没法送消息,
为了避免第二种情况,所以客户端采用心跳机制,不断给服务端发送心跳
为了避免第二种情况,所以客户端采用心跳机制,不断给服务端发送心跳
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(new HeartBeatRequestPacket());
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
super.channelActive(ctx);
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(new HeartBeatRequestPacket());
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
super.channelActive(ctx);
}
通常空闲检测时间要比发送心跳的时间的两倍要长一些,这也是为了排除偶发的公网抖动,防止误判
作者soulcoder
www.soulcoder.tech
www.soulcoder.tech
0 条评论
下一页