Netty核心技术
2024-08-24 21:27:59 0 举报
AI智能生成
Netty核心技术
作者其他创作
大纲/内容
BIO编程
I/O模型 简单理解就是用什么样的通道进行数据的发送和接收, 很大程度上决定了程序通信的性能
Java 支持三种网络编程模式I/O
BIO--同步并阻塞(传统阻塞型)
服务器实现模式为一个连接一个线程, 即客户端有连接请求时服务器就需要启动一个线程进行处理, 如果这个连接不做任何事情,就会造成不必要的线程开销
应用场景: BIO适用于连接数目较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中, JDK1.4之前唯一选择,但程序简单易于理解
NIO--同步非阻塞
服务器实现模式一个线程处理多个请求(连接), 即客户端发送的连接请求都会注册到多路复用器(selector)上, 多路服用器轮询到连接有I/O请求就进行处理
应用场景: NIO适用于连接数目比较多且连接比较短的架构,比如聊天服务器,弹幕系统,服务器间通讯等编程比较复杂,JDK1.4之后开始支持
AIO--异步非阻塞(了解)
AIO引入异步通道的概念, 采用了 Proactor 模式, 简化了程序编写, 有效的请求才启动线程, 它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接较多且连接时间较长的应用
应用场景: AIO适用于连接数目比较多且连接比较长的架构, 比如相册服务器,充分调用os参与并发操作,编程比较复杂,JDK7开始支持
BIO(Block IO)源代码
server
client
telnet 127.0.0.1 端口
ctrl +] 进入命令行模式 send xx
可以在控制面板->程序->打开或关闭window->勾选 telnet client
NIO(New IO)
Channel (通道)
NIO的通道类似于流, 但又不同, 通道可以同时进行读写,而流只能读或只能写, 通道可以实现异步读写数据,可以从缓冲区读数据,也可以从缓冲区写数据
FileChannel 主要用于对本地文件进行IO操作
read() 从通道读取数据并放入到缓冲区
write() 把缓存区的数据写入到通道中
transferFrom() 从目标通道中复制数据到当前通道
transferTo() 把数据从当前通道复制给目标通道
代码展示
Buffer (缓冲区)
缓冲区本质是一个可以读写数据的内存块, 可以理解为一个容器对象(含数组), 该对象提供了一组方法, 可以更轻松地使用内存块
缓冲区对象内置了一些机制, 能够跟踪和记录缓冲区的状态变化情况, Channel 提供从文件,网络读取数据的渠道, 但是读取或写入数据都必须由 Buffer
Buffer 类定义了所有缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息
mark
标记
position
位置,下一个要被读或写的元素的索引,每次读写缓冲区都会改值,为下次读写准备
limit
表示缓冲区的当前终点, 不能对缓冲区超过极限的位置进行读写操作, 且极限是可以修改的
capacity
容量, 即可以容纳的最大数量,在缓冲区创建时被设定且不能改变
Buffer 和 Channel 注意事项和细节
1), ByteBuffer 支持类型话的 put 和 get , put 放入的是 什么数据类型,get 就应该用相应的类型来取出, 否则就会有 异常信息BufferUnderflowException
2), 可以将一个普通的 Buffer 转成只读的 Buffer,asReadOnlyBuffer()方法
3), NIO 提供了 MappedByteBuffer ,可以让文件直接在内存中(堆外的内存)中进行修改,操作系统不需要拷贝一份 而如何同步到文件由NIO来完成
4), NIO还支持通过多个Buffer (即Buffer 数组) 完成读写操作, 即 Scattering[分散] 和 Gatering []
Selector (选择器)
1), Java 的NIO ,用非阻塞的IO方式, 可以用一个线程,处理多个客户端连接, 就会使用 selector (选择器)
2), Selector 能够监测多个注册通道上是否有事件发生(注意多个Channel 以事件的方式可以注册到同一个selector ) ,如果有事件发生, 便获取事件然后针对每个事件进行相应处理, 这样就可以只用一个单线程去管理多个通道, 也就是管理多个连接和请求
3), 只有在连接/通道真正有读写事件发生时, 才会进行读写, 就大大减少了系统开销, 并且不必为每个连接都创建一个线程,不用去维护多个线程
4), 避免了多线程之间的线程上下文切换的开销
java.nio.channels.Selector
open() 得到一个选择器对象
select() 监听所有注册的通道,当其中有IO操作可以进行时, 将对应的SelectorKey 加入到内部集合中返回, 参数用来设置超时时间
selectedKeys() 返回此选择器的选定键集
selectNow() 不阻塞,立马返回
wakeup() 唤醒selector
keys() 返回此选择器的键集
NIOServer
NIOClient
SelectionKey 表示Selector 和网络通道的注册关系
OP_READ = 1 << 0 读(1)
OP_WRITE = 1 << 2 写(4)
OP_CONNECT = 1 << 3 连接(8)
OP_ACCEPT = 1 << 4 接受(16)
selector() 得到与之关联的selector 对象
channel() 得到与之关联的通道
attachment() 得到与之关联的共享数据
interestOps(x) 设置或改变监听事件
ServerSocketChannel 在服务器监听新的客户端 socket连接
open() 得到一个 ServerSocketChannel 通道
bind(x) 设置服务器端口号
configureBlocking(x) 设置阻塞或非阻塞模式 ,取false 表示采用非阻塞模式
accept() 接受一个连接, 返回代表这个连接通道对象
register() 注册一个选择器并设置监听事件
SocketChannel 网络IO通道,具体复杂进行读写操作,NIO把缓冲区的数据写入通道,或者把通道数据读到缓冲区
open() 得到一个socketChannel 通道
configureBlocking(x) 设置阻塞或非阻塞,取值false 表示采用非阻塞模式
connect(x) 连接服务器
finishConnect() 如果上面的方法连接失败,接下来要通过该方法完成连接操作
write(x) 向通道写入数据
read(x) 从通道读数据
register() 注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
close() 关闭通道
NIO群聊系统
要求
1), 编写一个NIO群聊系统, 实现服务器端和客户端之间的数据简单通讯(非阻塞)
2), 实现多人群聊
3), 服务器端: 可以监测用户上线,离线,并实现消息转发功能
4), 客户端: 通过 channel 可以无阻塞发送消息给其它用户, 同时可以接收其它用户发送消息
5), 目的: 进一步理解NIO非阻塞的网络编程机制
服务端
客户端
NIO与零拷贝
基本介绍
1), 零拷贝是网络编程的关键, 很多性能优化都离不开
2), 在Java 程序中, 常用的零拷贝有 mmap(内存映射) 和 sendFile
mmap优化
通过内存映射, 将文件映射到内核缓冲区, 同时, 用户空间可以共享内核空间数据, 这样 ,在进行网络传输时, 就可以减少内核空间到用户空间的拷贝次数
sendFile 优化
Linux 2.1提供了 SendFile 函数, 其根本原理如下, 数据根本不经过用户态, 直接从内核缓冲区进入到 socketBuffer 同时, 由于和用户态完全无关, 就减少了一次上下文切换
注意: 零拷贝是从操作系统角度看,是没有cpu拷贝的,因为内核缓冲区之间, 没有数据是重复的,
零拷贝不仅仅带来更少的数据复制, 还能带来其它性能优势, 例如更少的上下文切换, 更少的CPU缓存伪共享以及CPU效验和计算
mmap 和 sendFile 区别
1), mmap 适合小数据量读写, sendFile 适合大文件传输
2),mmap 需要4次上下文切换, 3次数据拷贝 , sendFile 需要3次上下文切换,最少2次的数据拷贝
3), sendFile 可以利用DMA 方式,减少CPU拷贝, mmap 则不能( 必须从内核拷贝到 socket缓冲区)
零拷贝服务器
零拷贝客户端
Netty概述
为什么有了NIO之后还会有Netty
1), NIO的 类库API繁杂, 需要熟练掌握 Slector , ServerSocketChannel 和 SocketChannel ,ByteBuffer 等
2), 需要具备其他的额外技能, 需熟悉 Java多线程,因为NIO 编程涉及 Reactor 模式, 你必须对多线程和网络编程非常熟悉, 才能编写出高质量的NIO程序
3), 开发工作量和难度非常大,例如客户端面临断连重连, 网络闪断, 半包读写 ,失败缓存,网络拥塞 和异常流的处理等等
4), JDK NIO的Bug ,例如臭名昭著的Epoll Bug ,它会导致selector 空轮询, 最终导致cpu 100%, 知道JDK7版本该问题仍然存在,没有根本解决
概述
1), Netty 是由 jboss 提供的一个Java 开源框架, 现为Github 上的独立项目
2), Netty 是一个异步的,基于事件驱动的网络应用框架, 用以快速开发高性能,高可用的网络IO程序
3),Netty主要针对在TCP协议下, 面向 client 端的高并发应用, 或者 peer-to-peer 场景下的大量数据持续传输的应用
4), Netty 的本质是NIO框架, 适用于服务器通讯相关的多钟应用场景
优点
Netty 对JDK自带的NIO的api进行了封装解决了一下问题
1), 设计优雅, 适用于各种传输类型的统一API阻塞和非阻塞 socket, 基于灵活且可扩展的事件模型,可以清晰地分离关注点, 高度可定制的线程模型, 单线程, 一个或多个线程池
2), 使用方便, 详细记录的 javadoc ,用户指南和示例, 没有其它依赖项
3), 高性能,吞吐量更高, 延迟更低,减少资源消耗,最小化不必要的内存复制
4), 安全,完整的 SSL/TLS 和 startTL5 支持
5), 社区活跃,不断更新, 版本更新迭代短, 发现的bug 可以被及时修复,同时, 更多的新功能会被加入进来
应用场景
互联网行业: 在分布式系统中,各个节点之间需要远程服务调用,高性能RPC框架必不可少,Netty 作为异步高性能的通信框架, 往往作为基础通信组件被这些RPC框架使用
1), 阿里云分布式服务框架 Dubbo 的RPC框架是哟昂 Dubbo 协议进行节点的通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信
2), 1),游戏行业,无论手游服务器还是端游服务器,java语言得到了越来越广泛的应用,2, Netty 作为高性能的基础通信组件, 提供了 TCP/UCP和HTTP协议栈, 方便定制和开发私有协议栈,账号登录服务器 3, 地图服务器之间可以方便的通过 Netty 进行高性能的通信
3), 大数据领域 1-经典的Hodoop的高性能通信和序列化组件(AVRO)的 RPC 框架, 默认采用Netty 进行跨界通信 . 2-它的Netty Service 基于Netty 框架二次封装实现
Netty高性能架构设计
线程模型基本介绍
传统阻塞I/O服务模型
特点
采用阻塞IO模式获取输入的数据
每个连接都需要独立的线程完成数据的输入,业务处理
问题
当并发数很大, 就会创建大量的线程, 占用很大系统资源
连接创建后, 如果当前线程暂时没有数据可读, 该线程会阻塞在 Read 操作, 造成线程资源浪费
Reactor (感应器/分发者/通知者)模式
1), 基于IO复用模型: 多个连接共用一个阻塞对象, 应用程序只需要在一个阻塞对象等待, 无需阻塞等待所有连接, 当某个连接有新的数据可以处理时, 操作系统通知应用程序,线程从阻塞状态返回, 开始进行业务处理
2), 基于线程池复用线程资源, 不必在为每个连接创建线程, 将连接完成后的业务处理任务分配给线程进行处理, 一个线程可以处理多个连接的业务
说明
1), Reactor 模型,通过一个或多个输入同时传递给服务处理器的模型(基于事件驱动)
2), 服务器端程序会处理传入的多个请求,并将它们同步分派到相应的处理线程,因此 Reactor 模式也叫 Dispathcher 模式
3), Reactor 模式使用 IO 复用监听事件, 收到事件后分发给某个线程,这点就是网络高并发处理关键
Reatcor 核心组成
1), Reactor : Reactor 在一个单独的线程中运行, 负责监听和分发事件, 分发给适当处理程序来对 IO 事件做出反应, 它像公司的电话接线员, 他接听来自客户电话并将线路转到适当的联系人
2), Handlers : 处理程序执行IO 事件要完成的实际事件, 类似于 客户想要与之交谈的公司中的实际官员, Reactor 通过适当的处理程序来响应 IO 事件, 处理程序执行非阻塞的操作
根据Reactor 的数量和处理资源池数量不同,有3中典型的实现
单Reactor单线程
单Reactor 多线程
主从Reactor 多线程
Netty 线程模式(Netty 主要基于主从Reactor 多线程模型做了一定的改进, 其中主从 Reactor 多线程模型有多个Reactor)
NettyServer
NettyServerHandler
NettyClient
NettyClientHandler
Netty --TaskQueue自定义任务
1), 用户程序自定义的普通任务
2), 用户自定义定时任务
3), 非当前Reactor 线程调用 Channel 的各种方法
Netty -- 异步模型
基本介绍
1), 异步的概念和同步相对, 当一个异步过程调用发出后,调用者不能立刻得到结果, 实际处理这个调用的组件在完成后, 通过状态, 通知和回调来通知调用者
2), Netty 中的IO操作是异步的, 包括Bind ,Write ,Connect 等操作会简单的返回一个 ChannelFuture
3),调用者并不能立刻获取结果,而是通过 Future-Listener 机制, 用户可以方便的主动获取或者通过通知机制获取IO操作结果
4), Netty 的异步模型建立在 future 和 callback 的之上的, callback 就是回调, 重点说 Future ,它的核心思想是: 假设一个方法 fun, 计算过程非常耗时, 等待fun 返回显然不太合适, 那么可以在调用 fun 时候, 立马返回一个 future ,后续可以通过 Future 去监控方法 fun 的处理过程
Future -Listener机制
当 Future 对象刚刚创建起来, 处于非完成状态, 调用者可以通过返回 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作
常用api
isDone() 判断当前操作是否完成
isSuccess() 判断已完成的当前操作是否成功
getCause() 获取已完成的当前操作失败的原因
isCancelled() 判断已完成的当前操作是否被取消
addListener() 注册监听器, 当操作已完成(isDone 方法返回完成), 将会通知指定的监听器, 如果 Future 对象已完成, 则通知指定监听器
快速入门实例-Http服务
要求
1), Netty服务器在 6668端口监听, 浏览器发出请求,http://localhost:7777/
2),服务器可以回复给客户端 指定的一串消息, 并对特定的服务器进行过滤
HttpServer
HtppServerHandler
Netty--核心组件
Bootstrap : 引导,一个Netty 应用通常由一个Bootstrap 开始, 主要作用是配置整个 Netty 程序,串联各个组件, Netty 中 Bootstrap 类是客户端程序的启动引导类
ServerBootstrap : 是服务端启动引导类
常见api说明
1), handler() 该handler 对应 bossGroup
2), childHandler() 该handler 对应 workGroup
3), bind() 用于服务端,用来设置占用的端口号
4), connect() 用户客户端, 用来连接服务器
5), channel() 该方法用来设置一个服务端的通道实现
6), option() 用来给 ServerChannel 添加配置
Future, ChannelFuture : Netty 中所有的IO操作都是异步的, 不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures ,它们可以注册一个监听, 当操作执行成功或失败时监听会自动触发注册的监听事件
常见api说明
channel() 返回当前正在进行IO操作的通道
sync() 等待异步操作执行完毕
Channel
1), Netty 网络通信的组件,能够用于执行网络IO操作
2), 通过Channel 可获得当前网络连接的通道的状态
3), 通过Channel 可获得网络连接的配置参数
4), Channel 提供异步的网络IO操作(建立连接,读写,绑定端口)异步调用意味着任何IO调用都将立即返回,并且不能保证在调用结束时所请求的IO操作已完成
5), 调用立即返回一个 ChannelFuture 实例, 通过注册监听器到 ChannelFuture 上, 可以IO操作成功, 失败或者 取消时回调通知调用方
6), 支持IO操作与对应的处理程序
7), 不同协议, 不同的阻塞类型的连接都有不同的, Channel 类型与之对应, 常见的Channel类型有
1), NIOSocketChannel 异步客户端 TCP Socket 连接
2), NIOServerSocketChannel 异步的服务端TCP Socket 连接
3), NioDatagramChannel 异步的UDP连接
4), NioSctpChannel 异步的客户端 Sctp 连接
5), NioSctpServerChannel 异步的Sctp 服务器端连接, 这些通道蕴含了 UDP 和 TCP 网络IO以及文件IO
Selector
1), Netty 基于 Selector 对象实现 IO 多路复用, 通过 Selector 一个线程可以监听多个连接的 Channel 事件
2), 当向一个 Selector 中注册 Channel 后, selector 内部的机制就可以自动不断查询 (select) 这些注册的 Channel 是否已有就绪的 IO事件(例如可读,科协,网络连接完成等),这些程序就可以很简单的使用一个线程高效地管理多个Channel
ChannelHandler
1), ChannelHandler 是一个接口, 处理IO事件或拦截IO操作,将其转发到其它ChannelPipeline (业务处理链) 中的下一个处理程序
2), ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现, 方便使用期间,可以继承它的子类
ChannelPipeline 是一个重点
1), 是一个Handler的集合, 它负责处理和拦截 inbound 或者 outbound 的事件和操作, 相当于一个贯穿 Netty的链,(也可以这样理解, ChangePipeline 是保存 ChannelHandler 的List ,用于处理或拦截 Channel 的入栈事件或出栈事件)
2), ChannelPipeline 实现了一种高效形式的拦截过滤器模式, 使用户可以完全控制事件的处理方式, 以及 Channel 中各个的 ChannelHandler 如何相互交互
常见api
addFirst() 把一个业务处理类(Handler)添加到链中的第一个位置
addLast() 把一个业务员处理类(Handler) 添加到链中的最后一个位置
Netty -- EventLoop组件
ChannleHandlerContext
1), 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
2), 即ChannelHandlerContext 中包含一个具体的事件处理器 ChannelHandler 同时 ChannelHandlerContext 中也绑定了对应的 pipeline 和 channel 的信息,方便对 ChannelHandler 进行调用
常用api
close() 关闭通道
flush() 刷新
writeAndFlush() 将数据写入 channelPipeline 当中并刷新当前通道
ChannelOption
Netty 在创建Channel 实例后, 一般都需要设置 ChannelOption 参数
ChannelOption.SO_BACKLOG
对应TCP/IP协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小,服务器端处理客户端连接请求时顺序处理的, 所以同一时间只能处理一个客户端连接,多个客户端来的时候, 服务端将不能处理的客户端连接请求放入队列进行等待处理, backlog 参数指定队列的大小
ChannelOption.SO_KEEPALIVE
一直保持连接活动的状态
EventLoopGroup
EventLoopGroup 是一组 EventLoop 的抽象, Netty 为了更好的利用多核CPU资源, 一般会有多个 EventLoop 同时工作, 每个 EventLoop 维护着一个 Selector 实例
EventLoopGroup 提供 next 接口, 可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务, 在Netty 服务器编程中, 我们一般都需要提供两个 EventLoopGroup
常用api
NioEventLoopGroup() 构造方法
shutdownGracefully() 断开连接,关闭线程
Unpooled
Netty提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类
Netty-- 群聊应用实例
要求
1), 编写一个 Netty 群聊系统, 实现服务端和客户端之间的数据简单通讯(非阻塞)
2), 实现多人群聊
3), 服务器端: 可以检测用户上线, 离线 , 并实现消息转发功能
4), 客户端: 通过 channel 可以无阻塞发送消息给其它用户,同时接收其它用户发送的消息
服务端
服务端处理器
客户端
客户端处理器
Netty -- 心跳监测机制
要求
1), 编写一个Netty 心跳监测机制案例, 当服务器超过3秒没有读时, 就提示读空闲,
2), 当服务器超过5秒没有写操作时,就提示写空闲
3), 实现当服务器超过7秒没有读或写操作时,就提示读写空闲
服务器
服务器处理器
Netty -- 通过WebSocket 实现服务器和客户端长连接
要求
1), Http 协议是无状态的 , 浏览器和服务器间的请求响应一次, 下次会重新创建连接
2), 要求, 实现基于webstocket 的长连接的全双工的交互
3), 改变 Http 协议多次请求的约束, 实现长连接了,服务器就可以发送消息给浏览器
4), 客户端浏览器和服务器端会相互感知,比如服务器关闭了, 浏览器会感知, 同样浏览器
长连接服务器
长连接handler
hello.html
Netty -- Google Protobuf(谷歌协议)
编码和解码的基本介绍
编写网络应用程序时, 因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接受数据时就需要解码
codec(编解码器) 的组成部分由两个 : decoder(解码器) 负责把字节码数据转换成业务数据 和 encoder (编码器) 负责将业务数据转换成字节码数据
Netty 自身编解码的机制和问题分析
Netty提供的编码器
StringEncoder : 对字符串数据进行编码
ObjectEncoder 对java 对象进行编码
Netty 提供的解码器
StringDecoder 对字符串进行解码
ObjectDecoder 对java对象进行解码
Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现对 POJO(简单) 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术, 而Java 序列化技术本身效率不高, 存在如下问题
1), 无法跨语言
2), 序列化后的体积太大,是二进制编码的5倍多
3), 序列化性能太低
Protobuf 基本介绍
Protobuf 是Google 发布的开源项目, 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据串行化, 或者说序列化, 它很适合做数据存储或 RPC[远程过程调用] 数据交换格式
支持跨平台, 跨语言即[客户端和服务器可以是不同的语言编写的](目前支持 大多数语言, 例如C,C++,Java ,python等)
高性能,高可靠性
使用Protobuf 编译器能自动生成代码, Protobuf 是将类的定义使用 .proto 文件进行描述
通过 protoc,exe 编译器根据 .proto 自动生成.java文件
Protobuf 使用
引入 pom
student.proto
UserInfo.proto
下载 protoc.exe
protoc.exe --java_out . student.proto
NettyCoderServer
NettyCoderServerHandler
NettyClient
NettyClientHandler
Netty -- 编解码器和Handler的调用机制
基本介绍
1), Netty 的组件设计: Netty 的主要组件有 Channel , EventLoop , ChannelFurure , ChannelHandler , ChannelPipe 等
2), ChannelHandler 充当了处理入栈和出栈数据的应用程序逻辑容器
3), ChannelPipe 提供了 ChannelHandler 链的容器
编码解码器
1), 当Netty 发送或者接受一个消息的时候, 就将会发生一次数据转换, 入栈消息会被解码, 从字节转换成为另一种格式(比如java 对象),如果是出栈消息, 它会被编码成字节码
2), Netty提供一系列实用编码解码器, 它们都实现了 ChannelInboundHandler(解码) 或者 ChannelOutboundHandler(编码) 接口
总结
1), 不论解码器 handler 还是编码器 handler 即接受的消息类型必须与待处理的消息类型一致,否则该handler 不会被执行
2), 在解码器进行数据解码时, 需要判断 缓存区 ByteBuf 的数据是否足够, 否则接收的结果会期望结果可能不一致
Netty -- 整合Log4j
1). pom.xml
2),resource/log4j.properties
TCP 沾包和拆包及解决方案
基本介绍
1), TCP是面向连接的, 面向流的, 提供高可靠性服务 ,收发两端(客户端和服务端) 都要有一一成对的 socket ,因此 ,发送端为了将多个发给接收端的包,更有效的发给对方, 使用了优化方法( Nagle算法) ,将多次间隔较小且数量小的数据, 合并成了一个大的数据块, 然后进行封包 这样做虽然提高了效率, 但是接收就难于分辨出完整的数据包了, 因为面向流的通信使无消息保护边界的
2), 由于TCP 无消息保护边界, 需要在接收端消息边界问题, 也就是我们所说的粘包,拆包问题
代码问题产生
服务端
服务端handler
客户端
客户端handler
解决方案
1), 使用自定义协议+ 编码器来解决
2), 关键就是要解决 服务端每次读取数据长度的问题 , 这个问题解决, 就不会出现服务器读多读或者少读数据的问题, 从而避免tcp 粘包和拆包的问题
代码展示
服务器
服务器handler
客户端
客户端handler
Decoder
Encoder
Netty --核心源码剖析
前提说明
源码需要剖析到 Netty 调用 doBind 方法 ,追溯到 NioServerSocketChannel 的 doBind
并且要Debug 程序到 NioEventLoop 类, 的run代码,无限循环,在服务器运行
1), pom.xml
2), 使用 io.netty.example.echo 下面的 EchoServer 和 EchoClient 案例
Netty启动源码说明
1), 先看启动类, main 方法中, 首先创建了关于SSL 的配置类
2), 重点分析创建了两个 NioEventLoopGroup
1), 这两个对象是整个 Netty 的核心, 可以说, 整个Netty 的运作都依赖与它们, bossGroup用于接收Tcp 请求, 他会将请求交给 workGroup ,workGroup 会获取到真正的连接, 然后和连接进行通信, 比如读写解码编码等
2), EventLoopGroup 是事件循环组(线程组) 含有多个 EventLoop , 可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)
3), new NioEventLoopGroup(1); 这1表示bossGroup 有1个线程你可以指定, 如果 new NioEventLoopGroup() 会含有默认线程 cpu核数*2 , 即可以利用多核优势
4), 继续向里面走 children = new EventExecutor[nThreads]; 会创建 EventExecutor[x]数组
children[i]元素的类型都是 NioEventLoop , NioEventLoop 实现了 EventLoop 接口 ,其中 EventLoop 继承 EventExecutor
children 的类型是 EventExecutor 所以可以放入NioEventLoop
3), 创建了一个 ServerBootStrap();
1), 创建了一个b 对象, 他是一个引导类, 用于启动服务器和整个程序的初始化
2), b 和ServerChannel 关联, 而ServerChannel 继承了 Channel 有一些方法 例如 remoteAddress等可以使用
3), 变量b 调用group 方法,将bosssGroup 赋值给自己的父类prentGroup, workGroup 赋值给自己的 childGroup
4), 然后添加了一个Channel ,其中参数是一个 Class 对象, 引导类通过这个 对象反射创建 ChannelFactory , 然后添加了一些 TCP参数
5), 在添加了一个服务器专属日志处理器 handler
6), 再添加一个 SocketChannel 的handler
7), 然后在绑定端口并阻塞至连接成功
8), finally 块中的代码将在服务器关闭时优雅关闭所有资源
4),绑定端口分析
查找到 doBind(xx)方法
initAndRegister();
channel = channelFactory.newChannel();
1), 通过Nio 的 SelectorProvider 的 openServerSocketChananel 方法得到JDK的 channel ,目的就是让 Netty 包装JDK的 channel
2), 创建一个唯一的 ChannelId ,创建一个NioMessageUnsafe , 用于操作消息,创建了一个 DefaultChannelPipeline 管道, 是双向链表结构, 用于过滤所有的进出消息
3), 创建了一个NioServerSocketChannelConfig 对象,用于对外展示一些配置
init(channel);
1), init方法,这是个抽象方法(AbstractBootstrap类的), 由ServerBootStrap 实现
2), 设置NioServerSocketChannel 的tcp属性
3), 由于LinkedHashMap 是非线程安全的,使用同步进行处理
4), 对NioServerSocketChannel 的 ChannelPipeline 添加 ChannelInitializer 处理器
5), 可以看出 init() 核心作用 和 ChannelPipeline相关
6), 从NioServerSocketChannel 的初始化过程中, 我们知道 pipeline 是一个双向链表, 并且, 他本身就初始化了 head 和 tail 这里调用了他的 addList 方法,也就是将整个head 插入到 tail 的前面,因为 tail 永远在后面, 需要做一些系统的工作
doBind0();
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
AbstractChannelHandlerContext 类的 next.invokeBind(localAddress, promise)
DefaultChannelPipeline 类的 unsafe.bind(localAddress, promise)
NioServerSocketChannel 类的 doBind 方法
接收请求过程源码剖析
服务器启动肯定是要接收客户端请求并返回客户端想要的信息
processSelectedKeys()
processSelectedKeysOptimized()
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
AbstractNioMessageChannel 类的 read()
1), assert eventLoop().inEventLoop() 检查该 EventLoop 线程是否是当前线程
2), 执行 doReadMessages(readBuf); 方法, 并传入一个 readBuf 变量, 这个变量是一个list 也就是容器
1), 通过工具类,调用NioServerSocketChannel 内部封装, serverSocketChannel 的accept 方法,这是Nio做法
2), 获取到一个JDK的 SocketChanenl 然后使用 NIO 进行封装,最后添加到List容器中
3), 这样容器buf 就有了 NioSocketChannel
3), 循环容器, 执行 pipeline.fireChannelRead(readBuf.get(i));
4), doReadMessage 是读取boss线程中的 NioServerSocketChannel 接收到的请求,并把这些请求放进容器
5), 循环遍历, 容器中的所有请求, 调用 pipeline 的 fireChannelRead 方法, 用于处理这些接收的请求或者其它事件, 在 read 方法中, 循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法 ,开始执行管道中的 handler 的ChannelRead 方法
ServerBootstrap 类的 channelRead方法
Netty 接收请求过程源码梳理
接收请求 -> 创建一个新的NioSocketChannel -> 注册到一个work EventLoop上 -> 注册到 selector Read 事件上
1), 服务器轮询 Accept 事件, 获取事件后调用 unsafe.read() 方法, 这个unsafe 是ServerSocket 的内部类,该方法内部由2部分组成
2), doReadMessages() 用于创建NioSocketChannel 对象,该对象包装JDK 的Nio Channel 客户端,该方法会像创建 ServerSocketChannel 类似创建相关的 pipeline , Unsafe , config
3), 随后执行 pipeline.firstChannelRead 方法, 并将自己绑定到一个 channel 选择器上的 workGroup 中的 一个EventLoop ,并且注册一个0,表示注册成功, 但并没有注册读(1)事件
pipeline 与 Handler 和 HandlerContent创建源码剖析
三者关系
1), 每当ServerSocket 创建一个新的连接,就会创建一个 socket 对应的就是目标客户端
2), 每一个新创建的 Socket 都将会分配一个全新的 ChannelPipeline
3), 每一个 ChannelPipeline 内部都含有多个 ChannelHandlerContext
4), 它们一起组成了双向链表, 这些 Context 用于包装我们调用 addLast 方法时添加的 ChannelHandler
ChannelPipeline
这是一个 handler 的 List , handler 用于处理或拦截入栈事件和出栈事件, pipeline 实现了过滤器的高级形式, 以便用户控制事件如何处理以及 handler 在 pipeline 中如何交互
如图描述了一个典型的 handler 在 pipeline 中处理IO事件的方式, IO事件由 inBoundHandler 或者 outBindHandler 处理, 并通过调用 ChannelHandlerContent.firstChannelRead 方法转发给最近的处理程序
入栈事件由入栈处理程序自下而上的方向处理, 入栈处理程序通常处理由底部的IO线程生成入栈数据, 入栈数据通常从SocketChannel.read(byteBuf)获取
通常一个pipeline 有多个handler ,例如 一个典型的服务器在每个通道中都会有以下处理程序
协议解码器--将二进制数据转为java对象
协议编码器--将java对象转为二进制数据
业务处理程序 --执行实例业务逻辑
你的业务不能将线程阻塞, 会影响IO性能, 进而影响整个Netty 程序的性能, 如果你的业务程序很快, 就可以放在IO线程中,反之,你需要异步执行, 或者在添加 handler 的时候添加一个线程池
ChannelHandler
handlerAdded() 当把ChannelHandler 添加到 pipeline 时被调用
handlerRemoved() 当从pipeline 中移除使用
exceptionCaught() 当处理过程中发生异常时调用
ChannelHandler 的作用是处理IO事件或拦截IO事件, 并将其转发给下一个处理程序 ChannelHandler,Handler 处理事件时分入栈和出栈的, 两个方法的操作都是不同的, 因此Netty 定义了两个子接口继承 ChannelHandler
ChannelHandlerContent
ChannelHandlerContent 不仅仅继承了它们两个方法(入栈和出栈),同时也定义了一些自己的方法
这些方法能够获取 ChannelHandlerContent 上下文环境中对应的比如 Channel ,Excutor ,Handler , Pipeline 内存分配器, 关联 handler 是否被删除
ChannelHandlerContent 就包装了 handler 相关的一切, 以方便 ChannelHnadlerContent 可以在 pipeline 方便的操作 Hanlder
ChannelPipeline | ChannelHandler | ChannelHandlerContent 创建过程
1.1), 任何一个 ChannelSocket 创建的同时都会创建一个 pipeline
1.2), 当用户或系统内部调用 pipeline 的 add** 方法添加 handler 时, 都会创建一个包装这 handler 的 ChannelHandlerContent
1.3),这些 ChannelHandlerContent 组成了双向链表
2.1), Socket 创建的时候创建 pipeline
DefaultChannelPipeline(Channel channel)
1), 创建一个 future 和 promise 用于异步回调使用
2), 创建一个 inbound 和 tailContent 创建一个即是 inbound 类型又是 outbount 类型的 headContent
3), 最后将这两个 ChannelHandlerContent 互相连接, 形成双向链表
4), tailContent 和 headContent 非常重要,所有 pipeline 中的事件都需要流经它们
2.2), add** 添加处理器的时候创建 Context**
1), pipeline 添加 handler ,参数是线程池, name是null, handler 是我们活着系统传入的 handler ,Netty 为了防止多个线程导致安全问题, 同步了这段代码
2), 检查这个 handler 实例是否共享的,如果不是 ,并且已经被别的 pipeline 使用了, 则抛出异常
3), 调用 newContext(group, filterName(name, handler), handler); 方法,创建一个 Context 从这里可以看出, 每次添加一个 handler 都会创建一个关联的 Context
4), 调用 addLast方法 ,将 context 追加到链表中
5), 如果这个通道还没有注册到 selecor 上, 就将这个 Context 添加到这个 pipeline 的代办任务中,就注册好了以后, 就会调用 callHandlerAdded0(newCtx); --默认是什么都不做,用户可以实现这个方法
到这里,针对3创建过程, 了解的差不多了, 和最初说的一样, 没当创建 ChannelSelecor 的时候都会创建一个绑定的 pipeline ,一对一的关系, 创建 pipeline 的时候也会创建 tail 和 head 节点,形成了最初的链表, tail 是入栈 inboud类型的handler ,head 是出栈outbound 的类型, 在调用pipeline 的 addLast 方法的时候, 会根据给定的 handler 创建一个 context 然后将这个context 插入到链表的尾端tail前面即可
ChannelPipeline 调度 Channelhandler 的源码剖析
1), 这些都是出栈的实现, 但是调用的是 outbound 类型的 tail handler 来进行处理, 因为这些都是 outbound 事件
2), 出栈是tail 开始,入栈从 head 开始, 因为出栈是从内部外面写, 从tail开始, 能够让前面的 handler 进行处理, 防止 handler 被遗漏, 比如编码
3), 入栈是从 head 往内部输入, 让后面的 handler 能够处理这些输入的数据, 比如解码, 因此虽然实现了 outbound 接口,但不是从 head 开始执行输出任务
Netty心跳(hearbeat)服务源码剖析
目的
1),Netty 作为一个网络框架,提供了诸多功能, 比如编码解码等
2), Netty 还提供了非常重要的一个服务--心跳机制 hearbeat ,通过心跳检查对方是否有效, 这是RPC框架中是不可少的功能
说明
Netty 提供了 idStateHandler ,ReadTimeoutHandler ,WriteTimeoutHandler 三个handler 检查连接的有效性
idStateHandler
当连接的空闲时间(读或写)太长时, 将会触发一个 idStateHandler 事件,然后,你可以通过你的 ChannelInboundHandler 中重写 useEventTrigged 方法来处理该事件
ReadTimeoutHandler
如果在指定的事件没有读操作时, 就抛出异常,通过重写 ChannelInboundHandler 的 exceptionCaught 方法
WriteTimeoutHandler
如果在指定的事件没有写操作时, 就抛出异常,通过重写 ChannelInboundHandler 的 exceptionCaught 方法
IdleStateHandler
重要属性
private final boolean observeOutput; //是否考虑出栈时较慢的情况,默认是 false
private final long readerIdleTimeNanos; //读事件空闲时间,0是禁用该事件
private final long writerIdleTimeNanos;//写事件空闲时间,0是禁用该事件
private final long allIdleTimeNanos;//读或写空闲时间,0是禁用该事件
initialize(ChannelHandlerContext ctx)
只要给定的参数大于0, 就创建一个定时任务, 每个事件都创建,同时,将 State 状态设置为1,防止重复初始化,调用 initOutputChanged 方法, 监控出栈数据属性
ReaderIdleTimeoutTask
1), 得到用户设置超时时间
2), 如果读操作结束了(执行了 ChannelReadComplete 方法设置) , 就用当前时间减去给定时间最后一次读操作的时间(执行 channelReadComplete 方法设置),如果小于0, 就触发事件, 反之继续放入队列, 间隔时间是新的计算时间
3), 触发的逻辑是,首先将任务再次放到队列, 时间是刚开始设置的时间, 返回一个 promise 对象, 用于取消操作,然后设置 first 属性为 false表示, 下一次读取不在是第一次了,这个属性在 channelRead 方法会被改成true
4), 创建一个 IdStateEvent 类型的写事件对象, 将次对象传递给用户的 userEventTriggered 方法完成触发事件的操作
5), 总的来说, 每次读取操作都会记录一个时间, 定时任务时间到了, 会计算当前时间和最后一次读的时间的间隔, 如果间隔超过了设置的时间,就触发 userEventTriggered 方法
WriterIdleTimeoutTask
写任务的代码逻辑基本和读任务的代码逻辑一样, 唯一不同的就是有一个针对 出栈较慢数据的判断 hasOutputChanged(ctx, first)
AllIdleTimeoutTask
1), 表示这个监控所有的事件,当读写事件发生时, 都会记录, 代码了逻辑和写事件基本一致
2), 当前时间减去最后一次读或写,若大于0 说明超时了
3), 这里的时间计算是取最多大值来的, 然后像写事件一样, 判断是否发生了写的慢的情况
EventLoop 源码剖析
1),ScheduledExecutorService 接口是一个定时任务接口, EveentLoop 可以接收定时任务
2), EventLoop 接口, Netty 接口文档说明该接口作用, 一旦 Channel 注册了, 就处理该 Channel 对应的所有IO操作
3), SingleThreadEventExecutor 表示这是一个单个线程的线程池
1), 首先判断EventLoop 的线程是否是当前线程,如果是直接添加到任务队列中, 如果不是,则尝试启动线程(由于线程是单个的,因此只能启动一次),随后再将任务添加到队列中去
2),如果线程结束,并且删除任务失败,则执行拒绝策略,默认是抛出异常
3), 如果 addTaskWakesUp 是 false ,并且任务不是 NinWakeupRunable 类型的, 就尝试唤醒 selector 这个时候,阻塞在 selector 的线程就会立即返回
4), EventLoop 是一个单例线程池, 里面含有一个死循环的线程不断的做着3件事情, 监听端口, 处理端口事件, 处理队列事件, 每个EventLoop 都可以绑定多个Channel ,而每个Channel 始终只能由一个 EventLoop 来处理
handler 中加入线程池和 Context 中添加线程池的源码剖析
目的
1), Netty源码中做耗时的,不可预料的操作, 比如数据库,网络请求,会严重影响Netty 对Socket 的处理速度
2),而解决办法就是将耗时任务添加到异步线程池中, 但就添加线程池这步操作来讲, 可能有以下俩个方式
1), handler 中加入handler
2), handler 中添加线程池
3), context 中添加线程池
这两种方式比较
第一种在handler 中添加异步, 可以更加自由, 比如如果需要访问数据库,那我就异步, 如果不需要,则不异步, 异步会拖长接口响应时间, 因为需要将任务放入 mpscTask 中,如果IO时间很短, task 很多可能一个循环下来, 都没时间执行整个 task 导致响应时间不达标
第二种方式是 Netty 标准方式, 但是,那么做会将整个 handler 都交给业务线程池, 不论耗时不耗时, 都加入到队列中, 不够灵活
各有优势劣势,从灵活性考虑,第一种较好
Netty -- RPC(remote procedure call)远程过程调用
基本介绍
1),是一个计算机通信协议,该协议允许运行于一台计算的程序调用另一台计算机的子程序, 而程序员无需额外地为这个交互作用编程
2),两个或多个应用程序都分布在不同的服务器上, 它们之间调用都像本地方法调用一样
3), 常见的RPC框架有: 阿里Dubbo ,google的gRPC , spring 的 springcloud
使用 Netty编写 dubbo RPC(基于Netty)
需求说明
1), dubbo 底层使用 Netty 作为网络通讯框架, 要求用Netty 实现一个简单的 RPC 框架
2), 模仿 dubbo 消费者和提供者约定接口和协议, 消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据
服务端
客户端
netty服务端
netty服务端handler
netty客户端
netty客户端handler
服务接口
接口实现类
0 条评论
下一页