百万架构整体流程图
2020-11-13 17:33:36 0 举报
百万架构整体流程图
作者其他创作
大纲/内容
GroupSendService.create()
微信解密服务
模板方式推送信息
微信网关服务
判断Redis中是否存在key
PromotionPlanController.createPromotionPlan()
deal-customer-phone-server
参数:authorizerAppId调用数据库或缓存获取微信对象
1、使用Redisson获取Redis的写锁2、通过客户id获取客户上下文3、调用customerDeal()后释放写锁
isDuplicated()
GroupSendBatchService.filterFans()
SourceAndSinkSendGeneral
DB
更新Redis
7
MessageController.sendTemplatesMessage()
网关服务
ProducerTaskSyncCustomer.send()
traffic暴露接口
(1) 发送一条消息(2) 若失败进行重试,重试次数+1(3) 若重试次数大于预设阈值,退出
Kafka
订阅
规则服务
更新粉丝方法: checkSubTasksForSyncFansByOfficialAccountId(),每次更新一个task任务 (1) 更新一个task任务 (2) 若失败进行重试,重试次数+1 (3) 若重试次数大于预设阈值,退出
deal-open-weixin-server
CustomerSwarmService
接口: /app/{app-id}/member/{kuick-user-id}/customer/{customer-id}/official-account-template-messages参数: appId 项目id kuickUserId kuick用户id customerId 客户id officialAccountId 公众号id msgTemplateIds 消息模版id列表 templateVars 模板变量 templateProps 模板属性 sourceType 来源类型 sourceValue 来源值 templateMsg 模板消息文本
网站行为收集服务
消息幂等判断
deal-weixin-data-server
deal-api-server
deal-core2-server
GroupSendBatchService.get()
8
10
4
deal-customer-origin-server
数据库持久化存储
1、使用Drools开源规则引擎框架,执行规则2、持久化用户上下文,并更更新Redis
回调方法callback()
SourceAndSinkTaskSyncFans
1、从消息体中获取客户领域事件对象2、使用Redisson获取Redis的写锁3、通过客户id获取客户上下文4、调用customerDeal()后释放写锁
开始
1、扫描推广计划的二维码,成为新粉丝
send()
更新Customer数据源方法: checkTaskSyncCustomer(),每次更新一个task任务 (1) 更新一个task任务 (2) 若失败进行重试,重试次数+1 (3) 若重试次数大于预设阈值,退出
11
向数据库中持久化微信服务号群发实体对象:参数:GroupSend对象
EventProcessorForTimerTriggerEvent.process()
deal-robot-server
过滤粉丝方法: checkSubTasksForFilterFans(),每次更新一个task任务 (1) 更新一个task任务 (2) 若失败进行重试,重试次数+1 (3) 若重试次数大于预设阈值,退出
Nginx
GroupSendBatchService.doSendTemplate()
deal-behaviour-server
1、操作前端页面
6
客户溯源服务
QPS弹性伸缩
1、将内存中的字节流转换为img文件
上传文件到阿里云OSS服务,得到URL地址OSS服务优点:海量、安全、低成本、高可靠参数:appId newFileName 新文件名称inputStream 输入流
PromotionPlanService.handlePromotionPlanRule()
deal-weixin-decrypt-server
new GroupSend()
topic:customer_domain_event_v3
ConsumerTimerTriggerEvent.receive()
callback()
2
结束
发送模板消息
1、若规则已存在,则更新规则2、若规则不存在,则创建规则
DroolsCustomerDealEngineImpl.customerDeal()
行为服务deal-open-weixin-server
Http
deal-alert-server
K8S+Prometheus
过滤条件
消费者
1
保存事件
用户标签
监控
1、制定推广计划2、修改推广计划3、获取规则模板:velocity/rules/promotion_plan_reply.vm4、若规则已存在,更新规则,若规则不存在,创建新的规则
EventProcessAble.process()
ConsumerBehaviorLog.receive()
核心服务
创建客户
ProducerTaskFilterFans.send()
1、向微信用户发送消息
提醒服务
Source
获取公众号:参数:项目id: appId、公众号id: officialAccountId
1、从消息体中获取用户标签对象2、使用Redisson获取Redis的写锁3、通过客户id后获取客户上下文4、从客户上下文中获取标签集合并分类统计5、封装标签切换对象6、调用customerDeal()后释放写锁
1、触发规则,发送微信消息
1、从消息体中获取用户行为对象2、使用Redisson获取Redis的写锁3、通过客户id获取客户上下文4、CustomerService.queryBehaviourMetas()方法,发起Http请求获取行为元数据, 参数:行为对象的appId和action5、调用customerDeal()后释放写锁
dealWechatComponent.get(authorizerAppId)
1、获取从微信网关传递过来的经微信平台加密后的消息2、调用微信官方接口进行消息解密,解密成系统可使用的数据
判断公众号/小程序是否有效
根据原始ID获取公众号/小程序
ConsumerGroupSendByGeneral.receive()
1、订阅:behaviour_log_v3
EventProcessorForEventBehaviorLog.process()
客户领域事件
GroupSendBatchService.doTaskSyncCustomer()
GroupSendResource.create
微信开放平台相关服务
向数据库中持久化微信订阅号群发实体对象:参数:GroupSend对象
服务号批量发送:参数:GroupSend对象、GroupSendBatch对象采用异步的方式发送:调用 asyncTaskProcessBatch.doTask()
接口:/official-account/{official-account-id}/promotion-plans校验参数,核心参数有:appId officialAccountId 公众号Id name 推广计划名称
规则服务deal-robot-server
核心服务deal-core2-server
CustomerServiceImpl
ConsumerTaskFilterFans.receive()
判断是订阅号或者是服务号
GroupSendBatchService.doSyncFansByOfficialAccount()
发送客户领域事件流程
删除客户
修改客户
IdempotenceComponent.setex()
1、调用deal-core2-server服务2、推广计划创建客户
异步发布-订阅模块设计
参数:appId调用核心服务获取License对象,判断公众号或小程序是否过期
msg封装Event对象。通过Set集合实现白名单过滤
PromotionPlanService.createPromotionPlan()
CustomerDomainEventConsumer.receive()
微信
客户分群订阅客户领域事件流程
分类处理 发送Kafka消息
1、订阅:topic_wechat_events_proceed_00
HTTP
微信平台
1、操作前台页面,创建、修改、合并客户
ConsumerDealUserTagEvent.receive()
核心模块
groupSendBatchService.doTask()
上传文件方法参数:appId userId 用户iduserType 用户类型(如deal用户 和kuick用户)fileName 文件名称size 文件大小fileType 文件类型(如图片、视频)inputStream 输入流
生产者
微信网关
GroupSendService.send()
topic:customer_domain_event_v3判断是否幂等(Redis):idempotenceComponent.hasKey(key))
ConsumerTaskSyncFans.receive()
同步客户
1、创建客户:createCustomer()1、修改客户:updateCustomer()1、合并客户:mergeCustomer()
合并Customer数据源1、收集来源数集合GroupSendBatchService.getSyncCustomerConditionByBatch() 若集合为空,则代表发送全部客户,记录BatchStep状态为INTERSECT_CUSTOMER_FAILED,并退出合并方法 若集合大小为1,则标识唯一来源客户源,不需要合并,重命名key即可 若集合大小大于1,则标识客户源从多种方式获得,需执行过滤唯一Key操作,防止重复发送现象。2、若合并成功,则记录BatchStep状态为INTERSECT_CUSTOMER_SUCCEED 若合并失败,则记录BatchStep状态为INTERSECT_CUSTOMER_FAILED
GroupSendService.save()
BatchCalculateCallback.callback()
定时计划事件
groupSendBatchService.create();
gw-wechat-event-kafka-server
参数:message 消息内容signatrue 签名串timeStamp 时间戳nonce 随机字符串说明: 基于SHA1算法对消息进行解密
ProducerGroupSendByTemplate.send()
客户手机号来源服务
GW微信网关
1、执行业务逻辑
9
阿里云OSS服务
事件类型白名单过滤
5
纯文本、图片、图文方式推送信息
ConsumerCustomerDomainEvent.receive()
GroupSendBatchService.doSendGeneral()
EventProcessor.process()
删除需要过滤的粉丝的Redis key
decrypt()
SourceAndSinkTaskFilterFans
asyncTaskProcessBatch.doTask()
发布
同步粉丝1、通过客户找粉丝,GroupSendBatchService.syncFansByOfficialAccount()。2、异步分task任务将需要推送消息的粉丝存入Redis,每个task任务10000条3、若同步成功,则记录BatchStep状态为SYNC_FANS_SUCCEED 若同步失败,则记录BatchStep状态为SYNC_FANS_FAILED
Html2ImgService.convert()
同步粉丝
deal-file-server
SourceAndSinkTaskSyncCustomer
分群
GroupSendBatchService.syncFans()
ProducerGroupSendGeneral.send()
距上次活跃时间
ProducerTaskSyncFans.send()
1、调用deal-robot-server服务,创建或者修改推广计划规则2、如果规则创建成功,则将最新的推广计划配置同步到数据库内
GroupSendBatchService.intersectCustomer()
ConsumerTaskSyncCustomer.receive()
上传文件类型:图片、音频、视频、普通文件、文章
过滤粉丝
是
3
查询
1、发布:topic_wechat_events_proceed_00
Redis
插入
1、接收kafka行为消息2、将行为消息封装成一个key
解密
群发接口
sum(irate(nginx_server_requests{code=\"total\
create()
用户行为
微信推送服务入口:接口:/{app_id}/official-account/{official_account_id}/group-sends1、判断推送消息内容类型是否符合内置标准,不符合则抛出异常 消息内容类型:0:纯文本; 1:图片; 2、多图文; 4、模板
Redis.tryLock()实现消息幂等判断
dealCore2Component.getAppLicense(appId)
修改推广计划
1、更新群发状态2、根据消息类型选择发送方式:异步 类型: (1)纯文本、图片、图文 producerGroupSendGeneral.send(msg); (2)模板 producerGroupSendByTemplate.send(msg);
否
1、封装推广计划对象,2、存储到数据库
数据库持久化事件
行为服务
1、调用行为服务
创建客户:handleCreate()更新客户:handleUpdate()删除客户:handleDelete()
server { listen 8081; location /manage/traffic-status { vhost_traffic_status_display; vhost_traffic_status_display_format html; }}
handleMerge()
writeHtml()
接口:/official-account/{official-account-id}/promotion-plan/{promotion-plan-id}校验参数,核心参数有:appId officialAccountId 公众号Id promotionPlanId 推广计划id name 推广计划名称 msgTemplateId 消息模版id newFansPush 新粉丝推送 autoReplyMsgTemplateIds 自动回复消息模版id fansLimitCount 限制数量 overLimitReplyMsgTemplateIds 超限回复消息模版id increaseType 助力类型,1:仅新粉丝;2:所有粉丝 endMsgTemplateIds 推广计划结束时,推送的模板消息
合并客户分群 (1)删除分群中旧成员 (2)在指定分群中添加新成员 (3)同步Redis
MessageSendLogService.sendOfficialAccountTemplatesMessage()
1、将html文件转换为字节流保存在内存中
微信公众号推送服务内部流程图
topic:timer_event_v5判断是否幂等(Redis):idempotenceComponent.hasKey(key))
PromotionPlanService.updatePromotionPlan()
Event&Set集合
ConsumerGroupSendByTemplate.receive()
1、发布:behaviour_log_v3
GroupSendBatchService.doFilterFans()
创建推广计划
1、根据模板创建规则
订阅号发送:参数:GroupSend对象1、封装消息体2、调用restTemplate远程调用向客户订阅号发送消息
PromotionPlanController.updatePromotionPlan()
1、发布:topic_wechat_events_v10_00
html2Img()
用户
CustomerResource
CustomerEngine
Topic:behaviour_log_v3判断是否幂等(Redis):idempotenceComponent.hasKey(key))
html2img-server
IdempotenceComponent.hasKey()
ConsumerBehaviorLog.receive()
12
EventProcessorForCustomerDomainEvent.process()
EventProcessorForDealUserTagChange.process()
FileUploadComponent.ossUpload()
GroupSendBatchService.syncCustomer()
订阅号
参数:id: authorizerAppid 授权方appidsignature 签名串timeStamp 时间戳nonce 随机字符串content 消息内容
同步Customer数据源span style=\"font-size: inherit;\
1、客户的创建、修改、删除2、发送领域事件
封装key
save()
topic:topic_deal_user_tag判断是否幂等(Redis):idempotenceComponent.hasKey(key))
FileService.upload()
0 条评论
回复 删除
下一页