01,netty实现集群分布式IM客服系统
2021-04-27 16:39:52 6 举报
真实的IM客服系统架构,里面涉及netty集群及netty底层原理、mq解决分布式问题、以及分布式锁的实现、业务流程图
作者其他创作
大纲/内容
channel传送数据到buffer
workerGroup处理请求的线程池
客服登录
用户登录
如果是用户
心跳检测使用
还没进用户状态机,当前只是通道层面的逻辑判断
客服更改状态(上线,下线)
不为空,不用判断type直接执行响应事件
退出等待
客服状态
用户
customerStartEntryEvent
是
ServicerConversationForceAllocateEvent
推送评价申请
会话转移
exceptionCaught方法通道异常关闭
通知用户您已结束排队
创建
ServicerWorkingCallSessionTransEvent
用户发送消息
同样是发送mq
更新用户等待时间(做统计报表)
客服
当前服务的所有用户断线
customerRobotRequestManualEvent
用户当前状态
发生异常,通道断开会走channelInactive方法
客服也会出现这个情况
分配等待中的客户
监听事件和通知事件
超时提醒用户
再请求
执行事件
EventLoop
通知客服重复登录重新登录
聊天时结束服务
servicerCtxEntryMap客服本地缓存
生成
WebSocket是一种在单个TCP连接上进行全双工通信的协议例如:ws://127.0.0.1:10000/wsWebsocket协议是基于HTTP协议的,但是Websocket支持长连接
达到用户无感知的效果之后进入状态机
返回聊天记录补齐断线过程中丢失的聊天信息
请求人工服务执行事件
同时会创建上下文当参数传入各个handler来使用
等待重连超时
CustomerWaitingState用户等待状态
通知目标客服
覆盖新通道(value)
customerRobotState机器人状态
由于断开其他服务通道的mq消息没及时消费
channelReadComplete方法正常情况下读操作完成时被调用
发送mq
变更状态
通知
在线客服信息
CustomerServiceState用户被服务状态
channelRead0获取信息并处理
无论重复不重复
响应发送给客服端,验证会话个数相当于通知客服端有多少个会话
socketChannel
用户锁屏,app端服务进程被杀
servicerOfflineState客服下线状态
假如旧通道在其他服务器发mq关闭旧客服通道
机器人状态
获取断线前所接待的会话id(List)
断线期间用户结束会话
通知客服用户已经断线离开
一般就一个线程
转发
机器人会话
在线的客服列表
app端wifi切换4G
通信
创建新会话
若对方不是同一个服务器
新旧通道皆断开通道
重复登录
客服发送消息
event事件不为空
超时配置总时长,超出则断开用户
不允许重复请求人工
并发场景
客服端异常后重连
请求ES
关闭用户老通道
CustomerEndState用户结束状态
结束服务逻辑
上传七牛云CDN获取图片url
返回answer
用户IOS端
发送mq,,其他微服务消费的方式
CustomerState执行handle()
用户结束服务事件customerDisconnectEvent
异常重连/网络切换重连
ChannelInBoundHandler(入站)事件的处理者
MQ
用户pc
存入
正常超时
线程个数由cpu核数决定
ES服务器
分配结果
时间间隔1分钟刷新
直接发送到本地ctx
继承
存入本地机器
通知客服和用户首语
根据当前状态调用不同方法
key:serverId
判断用户端重复登录null != user
用户H5
IO多路复用的机制(select、poll、epoll)linux使用的是epoll,采用事件驱动,epoll使用mmap减少复制开销
ChannelPipeline 为 ChannelHandler 链提供了容器可以添加多个ChannelHandler到channelpipeline,会按顺序执行,链式调用
bind
保存聊天记录
加锁,防止用户收到“您已结束排队”,结果却被接待
服务器B(与服务器A组成集群)
获取信息
ServicerReconnectEvent
用户安卓端
回退到机器人状态
打印日志比如:交互信息,状态等等主要供调试bug
监听和通知
缓存redis:IM_CTX_CACHE_PROCESS加expire过期
客服结束会话
提示用户连接断开
客服注销
从buffer获取信息
5,清理用户资源()
分配成功
加锁处理
是否重连type=(10013)
ServicerState执行handle()
客服pc
ChannelPipeline 绑定事件处理服务
其他服务器
服务器A(与其他服务器集群部署)
发送mq至目标服务器:达到定点发送的效果问题:假如有多台netty服务器组成集群,客服要发送消息给用户,假如两者不在同一个服务器,则需要由mq作为中间件。处理消息的流转:但是,mq要准确的发送给目标服务器,不能发送给所有的其他服务器去消费。比如客服连接服务器1,发送给某个用户,而用户通道连接至服务器3,则mq客服发送到某个队列,只能服务3去消费这个队列。不然的话假如所有服务器都消费,则造成冗余消费,导致mq性能问题。解决方案:1,首先,用户和客服登录时,对象存入redis,对象里包含当前连接服务器的ip地址,当重连时,假如ip发生变化,则加锁,及时更改redis中的值。2,发送:先获取对方ip,然后发送,this.amqpTemplate.send(\"amq.topic\
CustomerWaitingExitEvent
ServicerSendEvent
请求人工客服
发送至MQ
主从Reactor多线程模型
ping消息
项目中用的分布式锁问题:因netty服务器集群部署,业务上交互信息都交给redis处理,固在多个场景出现并发问题。所以项目中用到分布式锁场景举例:场景1,等待中的用户,假如用户正在被分配,用户又结束等待,同时用户异常断线清理资源场景2,客服转移当前会话给另外一个客服,同时用户结束服务,又用户重复登录,或者通道异常断线清理资源分布式锁实现方案:基于高并发的考虑,采用redis方案,也可借鉴zk实现方案font color=\"#3333ff\
其他机器消费消息,关闭旧通道ctx
仅仅关闭通道不清理资源
赋初始状态customerStartState
判断客服是否本地CTX
用户结束服务
服务器本地内存
CustomerRobotSendEvent
servicerWaitFinToWorkingEvent
无客服在线
Schedule任务
bossGroup接受请求accept事件的线程池负责处理TCP/IP连接的,socketChannel的建立
ServerSocketChannel(监听新的客户端 Socket 连接)
ByteBuf缓冲区
各种对应关系
ServicerWaitFinishState等待离线中的结束会话
用户/客服状态
ServicerOfflineState下线状态
客服退出了,有可能系统异常,内存中依然有值
事件处理
否
聊天
记录客服离线时间点
waitFinFinUserSession
不做处理
由于每个eventloop是串行调用多个channel的事件,所以要保证事件内的业务逻辑处理要快,否则会出现线程内阻塞,甚至导致服务假死
用户4G请求人工,等待排队后断网,客服接入这个等待用户,会触发
发送至本地ctx
获取到文本信息后进行业务处理进入状态机进行流转
判断用户是否本地CTX
channelActive建立连接
返回客户端Pong消息
4G切wifi
分配等待中的用户
value:ctx上下文
WebSocketFrame聊天信息
mongoDB
每个eventloop负责监听多个channel且事件串行调用执行
加锁避免客服下线以及用户下线/已分配造成的并发冲突
走断开事件清理资源
监听到的事件由handler处理
wifi切4G
向客户推送评价申请
监听到连接后调用
记录客服上线时间点(记录客服在线时长,考勤用)
写回数据
FullHttpRequest握手请求
踢掉旧用户
查看用户是否全部处理
异常断线重连
消费
ServicerWaitFinishState
清理冗余本地内存map数据
key:userId
判断request中type值获取响应事件去执行
发送至
workingFinUserSession
微服务
用户端
1,发送断线消息通知用户
异常缓存redisIM_CTX_CACHE_EXCEPTION加expire过期
超时断开
登录请求建立连接
客服状态机iMServicerStateMachine.fire(request);
Handler添加到容器
是否图片
新旧通道皆提示重复登录,并断开重新登录一次为了提示客服,账号存在泄露密码的风险,及时改密码
通知老通道的客户端重复登录
ChannelPipeline: 类似spring容器,存放bean,在netty里,管理事件,以及链式调用的处理顺序ChannelHandlerContext: 类似spring的上下文,在netty里,管理 ChannelHandler 和 ChannelPipeline 的交互ChannelHandler(事件): 类似spring容器里的bean
CustomerWorkingExitEvent
1,是否已经排队中
结束服务
2,分配客服
各种缓存信息
异常断线
判断
进入事件说明已经重连成功重连后
反向代理
发生异常
Redis
更新user对象set机器人会话id
等待重连
赋初始状态servicerStartState
handlerRemoved方法钩子方法当ChannelHandler从一个ChannelPipeline中移除时被调用
用户不是说的最后一句话,则进行用户超时退出
发送首语
达到客服无感知的效果之后进入状态机
判断客服重复登录null != servicer
反射的方式创建
等待时退出队列
ServicerWorkingState
重复
关闭客服老通道
判断是否为用户登录
客服端
ServerBootstrap(用于启动netty服务)
网络切换
abstract ImState(根据type流转到相应event)
会话强制分配
用户退出app账户
用户状态机iMCustomerStateMachine.fire(request);
ServicerWorkingAcceptSessionEvent
通道是否在本机
修改映射 用户映射到新客服
接受或拒绝会话转移
nginx
用户/客服正常断开逻辑
通知APP端重连成功
3,更新用户最后一次活跃时间(用于超时断线)
客服当前状态
app端异常重连
可一次分配多个等待中的用户(建立会话)customerWaitingAssginEvent
客服端获取丢失的用户聊天记录
3,断开用户的ctx
若一直不重连及redis缓存过期
消费mq
提示子下所有用户断开连接进行重连
删除等待队列中的用户
servicerWorkingState客服工作状态
类似情况都是这种处理模式,判断用户和客服是否同一个服务器,若不是则发送mq,另一端消费mq获取消息在处理
客服登出,清理相关资源
无网到有网
wifi切wifi
是否正常退出
用户结束服务的业务逻辑
发送mq关闭其他服务的老通道
当客户端和服务端连接的时候会建立一个socketChannel负责基本的 IO 操作,例如:bind(),connect(),read(),write()
EventLoop 线程
channel从缓冲区获取数据
用户/客服对象信息
清除冗余数据
客服断线重连
端口
ServicerWorkingState正在服务状态
清理客服资源
用户华为P10,切后台,20秒后触发
旧会话状态修改
判断客服当前状态
文本信息
handlerAdded握手请求
如果是客服
ServicerWaitFinishState等待下线状态(想下线,当前有用户建立会话,但不再分配新用户)
封装消息发送
当前状态不变servicerWorkingState或ServicerWaitFinishState
触发客服再分配
其他情况
4,保存断线类型至mongo
重连
断开接待中的用户
用户注销
如果已经全部处理完毕设置客服为离线状态
servicerOfflineToOnlineEvent
不提示用户重复登录,为了提高用户的体验感,因为用户可能有多个手机端登录现象
业务状态机(运用状态模式)
各种限制参数
通知客服用户退出
还没进客服状态机,当前只是通道层面的逻辑判断
创建服务记录
请求丢失的聊天记录
用户是否已经被分配
判断是否为客服登录
userCtxEntryMap用户本地缓存
swap日志任务
提示用户无客服在线
通知用户连接机器人成功
等待队列
创建与客服聊天新会话
客服满线,进入等待
关闭老通道
ChannelHandlerContext上下文(之后简称为ctx)
CustomerWorkingSendEvent
客服pc端
查看子下所有用户是否全部处理
向用户推送评价申请
ThreadLocalhandshake()握手
登录/等待消息体
0 条评论
下一页