pass 交互数据接入层逻辑
2021-08-12 10:48:21 0 举报
流程图
作者其他创作
大纲/内容
调用排线模型计算接口参数直接取排线模型执行计划表的模型输入(对应接口内容见:pass排线模型接口文档 1)
更新排线模型执行计划(t_lines_execute_plan)计划状态:启动模型失败
是
调方案中心:(接口详情见:Confluence接口文档4.1)状态2;//建模任务执行中并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
调用查询是否需要计算预测件量接口(接口详情见:Confluence 6.2)
调方案中心:(接口详情见:Confluence接口文档4.1)状态-2; //内核执行失败并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
模型输入组装等待事件的处理
更新排线模型执行计划(t_lines_execute_plan)模型输入和计划状态为就绪
调方案中心:状态 -4;//模型内核处理超时
更新模型结果通知(t_lines_msg_receive)处理状态为已处理
查询mysql中排线模型执行计划表(t_lines_execute_plan)中pass_msg_receive_time在25小时内并且计划状态为执行中执行计划取(pass执行消息接收时间)最新的一条数据
否
模型输入resultSingleRouteKeysPrefix =方案id/方案id_SingleStepLine_
在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
整体流程
通知数据中心方案执行成功
结束
查询mysql中 排线消息接收表(t_lines_msg_receive)是否存在方案id相同的模型任务启动数据
将上一步的od货量数据总数、单批条数、批次数、oss的keys 作为计算的结果的原数据发KAFKA的EOS_DRSM_CORE_LINES_CREATE_BATCH_FLOW_RESULT主题
将上一步得到数据的发KAFKA,并将状态修改为待满足
Spark消费计算OD货量中间态的数据KAFKA消息
接收PASS方案中心请求启动方案计算
通知数据中心模型计算成功(接口详情见:Confluence接口文档6.2)并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
更新排线模型执行计划表的模型输出
参数是否检验通过
调用模型开始计算
接收PASS方案中心启动方案计算通知
返回错误信息
取排线模型执行计划(t_lines_execute_plan)的pass输入(pass_in)字段(pass输入参数详情见:Confluence接口文档5.1)
处理待处理的模型模型任务启动通知
调方案中心:状态 -3;//内核返回结果处理失败
处理模型计算超时排线模型执行计划
上下游系统交互图
组装模型所参数
取最早的就绪的排线执行计划调用模型计算接口(接口详情见:pass排线模型接口文档 2)
查询mysql中排线模型执行计划表(t_lines_execute_plan)中pass_msg_receive_time在25小时内并且计划状态为模型输入组装中和就绪执行计划
排线模型执行计划表(t_lines_execute_plan)计划状态为模型计算超时
根据1.4的地区编码、参考开始时间、结束时间到HIVE上的 dm_analysis.tt_drsm_linehaul_out_batch_flow表拉取OD货量数据
模型排队计算
将hive查询的根据srcDeptCode、srcStoreBatch、destDeptCode、standardToOverlandFlag、periodIndex分组数字型的数据各自求平均值得到模型所需要的OD货量数据
凌晨同步mysql中的t_lines_execute_plan表pass_msg_receive_time在昨天范围内数据到hvie中dm_predict.tt_drsm_linehaul_execute_plan表
生成 排线模型执行计划表(t_lines_execute_plan)计划状态为 组装模型输入中
是否需要计算预测件量
更新排线模型执行计划表的计划转为计算成功
模型返回的结果是否为计算成功
更新 执行计划等待事件表(t_lines_execute_plan_await_event)更新事件结果消息字段的值事件状态为 不能满足
调方案中心:状态-8;//建模内核资源分配失败
将模型所需要的OD货量数据分页一页10万条插入oss,keys规则为:方案id/方案id_BatchFlowTemp_n.jsonn为从1到总页数并维护info数据在oss key规则为:方案id/方案id_BatchFlowTemp_info.json
通过拉取主题:EOS_DRSM_CORE_LINES_CREATE_BATCH_FLOWOD货量的kafka消息businessAugmenterKeys到oss中查询1.4的数据
插入mysql排线消息接收表(t_lines_msg_receive)消息处理状态:待处理
处理模型分配资源超时排线模型执行计划
消息中success是否为true
是否存在事件状态为不能满足
http
查询执行计划等待事件表t_lines_execute_plan_await_event根据方案id查询事件组为模型输入组装
查询mysql中排线消息接收表(t_lines_msg_receive)中创建时间在15分钟内并且消息状态为待处理的模型结果通知
模型输入方案id (processId)= pass输入方案id (processId)
读取模型输入元数据写入模型输出源数据
更新排线模型执行计划(t_lines_execute_plan)计划状态:执行中
查询mysql中排线模型执行计划表(t_lines_execute_plan)中pass_msg_receive_time在25小时内并且计划状态为执行中执行计划
模型输入resultSiteCapacityDeptKeysPrefix =方案id/方案id_SiteCapacity_dept_
插入mysql排线消息接收表(t_lines_msg_receive)消息处理状态:已处理
是否超时
调方案中心:(接口详情见:Confluence接口文档4.1)状态-8;//建模内核资源分配失败并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
除batchFlowKeys以外其他的模型所需的13份数据的keys = pass各个数据对象中的newBskey经过转化得到的keys(转化规则:到oss 通过 newBskey以_分割取第一个子串 /newBskey_info.json 为key获取原数据,取其中的size字段,keys中的元素为newBskey以_分割取第一个子串 /newBskey_n.json,n的值为1 到 size)
组装模型所需的参数
是否计算成功
接收模型计算结果通知(接口详情见:pass排线模型接口文档 2)
查询执行计划等待事件表t_lines_execute_plan_await_event创建时间在48小时内并且中状态为未发送的记录按创建时间升序排序取(2-待满足的条数)条数据
使用UDF解析模型输入的 城市场地映射到hive中dm_predict.tt_drsm_linehaul_in_city_depot表,模型输入的产品路由关系到hive中dm_predict.tt_drsm_linehaul_in_mode_transport表,模型输入的快标转陆运到hive中dm_predict.tt_drsm_linehaul_in_standard_to_overland表
查询执行计划等待事件表t_lines_execute_plan_await_event创建时间在48小时内并且状态为待满足的记录
模型输入resultAllRouteBatchSize和resultSingleDataBatchSize和resultSingleStepCargoBatchSize和resultSiteCapacityDeptBatchSize为配置值目前为10万
模型返回是否成功
调方案中心:(接口详情见:Confluence接口文档4.1)状态-4;//模型内核处理超时并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
回调方案中心:状态2;//建模任务执行中和预计完成时间
拉取计算的结果的原数据消息
应用程序消费od货量计算结果的源数据信息的消息
查询mysql中排线消息接收表(t_lines_msg_receive)中创建时间在15分钟内并且消息状态为待处理的模型任务启动通知
处理待处理的模型结果通知
接收模型计算结果通知
是否重复发送
写入模型输入读取模型输出
更新排线模型执行计划(t_lines_execute_plan)设置模型输入batchFlowTempKeys和更新计划状态为就绪
待满足的记录条数是否大于等于2
排线模型执行计划表(t_lines_execute_plan)计划状态为资源分配超时
是否为高峰方案(根据peakPeriod判断)
是否存在事件状态为待满足或未发送
模型计算服务(模型方负责维护)
更新 执行计划等待事件表(t_lines_execute_plan_await_event)更新事件结果消息字段的值事件状态为:已满足
是否计算超时当前时间是否大于(模型计算消息下发时间+12小时)
回调方案中心(接口详情见:Confluence接口文档4.1):状态6;//建模内核资源分配中,并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
是否分配资源超时当前时间是否大于(pass执行消息接收时间+12小时)
调方案中心:状态2;//建模任务执行中
处理排序相关任务的定时器(一分钟一次)
更新排线模型执行计划表的计划转为计算失败
回调方案中心(接口详情见:Confluence接口文档4.1):-2;//内核返回结果处理失败,并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
当前时间是否大于(模型计算消息接收时间 + 12小时 )
每天同步并解析1.1-1.3的数据到HIVE中
pass系统(数字化场地pass系统负责维护)
组成模型输入过程是否出现异常
batchFlowKeys值的规则和上面获取其他13份数据的keys一样
查询mysql中 排线消息接收表(t_lines_msg_receive)是否存在方案id相同的模型计算结果通知数据
处理将排线等待事件发送值KAFKA
读取模型输入写入模型输出
调方案中心:(接口详情见:Confluence接口文档4.1)状态2;//建模任务执行中、预计执行完成时间为执行计划表中的predict_finish_time并在mysql消息下发表(t_lines_msg_issue)中记录本次消息下发数据
回调方案中心:-6;//建模内核参数校验失败
模型输入resultSingleStepCargoKeysPrefix =方案id/方案id_SingleStepCargo_
模型输入resultAllRouteKeysPrefix =方案id/方案id_WholeStepRoute_
更新模型任务启动通知(t_lines_msg_receive)处理状态为已处理
接收PASS方案中心请求启动方案计算(接口详情见:Confluence接口文档4.2)
根据模型任务启动通知的消息组装模型所需的参数
处理 待处理的模型结果通知
OSS(资源挂在数字化场地)
periodIndex=(1-7)标示周几,周日为7
取最早的就绪的排线执行计划调用模型计算接口
回调方案中心:状态6;//建模内核资源分配中
数据接入中间层(dubbo服务)(大数据系统部负责维护)
periodIndex=0
写入方案id/方案id_SiteCargoQuantity_info.json方案id/方案id_SiteCapacityTransFlowedMO_info.json方案id/方案id_FLowCargoDetail_info.json
0 条评论
下一页