2024-开源物联网平台Thingsboard—规则引擎
2024-07-30 09:00:07 5 举报
AI智能生成
规则引擎:rule-engine、规则链、规则节点
作者其他创作
大纲/内容
使用特点
组态化编辑
每个规则节点可以具有取决于规则节点实现的特定配置参数。例如,“过滤器-脚本”规则节点可通过处理传入数据的自定义JS函数进行配置
调试—事件模式
每个规则节点可以设置为调试模式,启用之后,在事件中可以查看到入站-出站消息
JavaScript函数测试台
一些规则节点具有特定的UI功能,允许用户测试JS函数。单击“ 测试过滤器功能”后,您将看到JS编辑器,可使用该编辑器替换输入参数并验证函数的输出。
能力
1、保存到数据库之前,对接收的遥测数据或属性进行验证和修改
2、将遥测或者属性从设备复制到相关资产,以便可以汇总遥测。
3、根据定义的条件对alarms进行创建、更新、清除
4、根据设备生命周期时间触发操作。如设备上线、设备离线状态,创建告警事件
5、加载所需的其他处理数据。在客户设备或租户属性中定义的设备的负载温度阈值。
6、触发对外部系统的REST API调用
7、发生复杂事件时发送电子邮件,并使用“电子邮件模板”中其他实体的属性。
8、根据定义的条件进行RPC调用。
9、与外部管道(如Kafka,Spark,AWS服务等)集成
是什么:基于事件构建的工作流,是一个高度可定制的框架,用于复杂事件的处理。
组成
Message(规则消息)
是什么?
接收系统中的各种消息
作用:用于接收任何事件,可以是来自设备的,设备生命周期事件、REST API事件、RPC请求等的传入
特点:可以被序列变化的有着规定的数据结构,表示系统中的各种消息
消息组成
消息ID(MessageId),基于时间的通用唯一标识符
消息发起者(Originator of the Message):DEVICE,Asset或者其他Entity标识符
消息类型(Type Of the Message):遥测或者不活动的事件
消息负载(Payload of the message):带有实际消息有效负载的json报文
元数据(Metadata):键值对的列表以及消息有关的其他数据
消息类型
发布遥测(Post telemetry)
消息类型名称
POST_TELEMETRY_REQUEST
消息元数据
deviceName - 设备原始名称,
deviceType - 设备原始类型,
requestId - RPC request Id provided by client
消息负载格式
{"temperature": 22.7}
从服务端到设备的rpc请求(RPC Request to Device)
消息类型名称
RPC_CALL_FROM_SERVER_TO_DEVICE
消息元数据
deviceName 、deviceType
消息负载格式
json containing method and params:
{
"method": "getGpioStatus",
"params": { "param1": "val1" }
}
连接事件(Connect Event)
消息类型名称
CONNECT_EVENT
消息元数据
deviceName 、deviceType
消息负载格式
与活动事件负载格式一致
断开事件(Disconnect Event)
消息类型名称
DISCONNECT_EVENT
消息元数据
deviceName 、deviceType
消息负载格式
与活动事件负载格式一致
活动事件(Activity Event)
消息类型名称
ACTIVITY_EVENT
消息元数据
deviceName 、deviceType
消息负载格式
json containing device activity information:
{
"active": true,
"lastConnectTime": 1526979083267,
"lastActivityTime": 1526979083270,
"lastDisconnectTime": 1526978493963,
"lastInactivityAlarmTime": 1526978512339,
"inactivityTimeout": 10000
}
发布属性(Post attributes )
消息类型名称
POST_ATTRIBUTES_REQUEST
消息元数据
deviceName 、deviceType
消息负载格式
{"currentState": "IDLE"}
从设备到服务器的RPC请求(RPC Request from Device)
消息类型名称
TO_SERVER_RPC_REQUEST
消息元数据
deviceName 、deviceType
消息负载格式
json containing method and params:
{"method": "getTime","params": { "param1": "val1" }}
不活跃事件(Inactivity Event)
消息类型名称
INACTIVITY_EVENT
消息元数据
deviceName 、deviceType
消息负载格式
与活动事件负载格式一致
实体已创建事件(Entity Created)
消息类型名称
ENTITY_CREATED
消息元数据
userName - name of the user who created the entity,
userId - the user Id
消息负载格式
json containing created entity details:
{
"id": {
"entityType": "DEVICE",
"id": "efc4b9e0-5d0f-11e8-8559-37a7f8cdca74"
},
"createdTime": 1526918366334,
...
"name": "my-device",
"type": "temp-sensor"
}
实体更新事件(Entity Updated)
消息类型名称
ENTITY_UPDATED
消息元数据
预实体创建事件一致
消息负载格式
预实体创建事件一致
实体被删除事件(Entity Deleted)
消息类型名称
ENTITY_DELETED
消息元数据
预实体创建事件一致
消息负载格式
预实体创建事件一致
实体被分配到用户事件(Entity Assigned)
消息类型名称
ENTITY_ASSIGNED
消息元数据
userName - 执行实体分配的人
userId - 用户id
assignedCustomerName - 被分配给用户的名称
assignedCustomerId -用户的id
消息负载格式
预实体创建事件一致
实体取消分配事件(Entity Unassigned)
消息类型名称
ENTITY_UNASSIGNED
消息元数据
userName -执行取消分配实体的人
userId - 用户id
unassignedCustomerName -被取消的用户名称
unassignedCustomerId -用户id
消息负载格式
预实体创建事件一致
属性更新事件(Attributes Updated)
消息类型名称
ATTRIBUTES_UPDATED
消息元数据
userName - 执行属性更新的人
userId - 用户id
scope - 属性的范围(服务端属性、共享属性)
消息负载格式
key/value json串,被更新的属性名称-值集合
{
"softwareVersion": "1.2.3"
}
属性删除事件(Attributes Deleted)
消息类型名称
ATTRIBUTES_DELETED
消息元数据
userName - 谁删除的属性
userId - 用户id
scope - 属性的范围(服务端属性、还是共享属性)
消息负载格式
被删除的属性集合
{
"attributes": ["modelNumber", "serial"]
}
告警事件(Alarm event)
消息类型名称
ALARM
消息元数据
消息元数据的所有字段
isNewAlarm - 是否是新告警
isExistingAlarm - 是否是已存在告警
isClearedAlarm -告警是否已清除
消息负载格式
告警详情
{
"tenantId": {
...
},
"type": "High Temperature Alarm",
"originator": {
...
},
"severity": "CRITICAL",
"status": "CLEARED_UNACK",
"startTs": 1526985698000,
"endTs": 1526985698000,
"ackTs": 0,
"clearTs": 1526985712000,
"details": {
"temperature": 70,
"ts": 1526985696000
},
"propagate": true,
"id": "33cd8999-5dac-11e8-bbab-ad47060c9431",
"createdTime": 1526985698000,
"name": "High Temperature Alarm"
}
通过REST API 请求规则引擎事件(REST API Request to Rule Engine)
消息类型名称
REST_API_REQUEST
消息元数据
requestUUID - 请求的id
expirationTime - 超时时间
消息负载格式
请求的报文json
Rule Node(规则节点)
是什么?规则节点是规则引擎的基本组件,它一次处理单个传入消息并生成一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。
作用
规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。提供节点自定义能力,实现数据的运算。
规则节点之间的关系
规则节点可能与其他规则节点相关。每个关系都有关系类型,这是用于标识关系的逻辑含义的标签。当规则节点生成传出消息时,它总是指定用于将消息路由到下一个节点的关系类型。
规则节点类型
过滤节点
用于消息过滤和路由,过滤成功走真链、错误走假链
节点类型
检查关系过滤器节点(check relation)
根据类型和方向检查从所选实体到消息发起者的关系。
检查存在字段节点( check existence fields)
检查消息、元数据中是否包含指定字段
消息类型过滤器节点(message Type)
在节点配置中,管理员为传入消息定义了一组允许的消息类型。系统中有预定义的消息类型,例如Post属性,Post Telemetry,RPC Request等。管理员还可以在节点配置中定义任何Custom Message Types。
消息类型切换节点(Message Type Switch)
按消息类型路由传入的消息。如果传入的消息具有已知的消息类型,则将其发送到相应的链,否则,将消息发送到其他链。
发起者类型过滤器节点(originator Type)
发起者类型过滤(设备、资产、租户、客户等)
发起方类型交换节点(originator type switch)
根据发起者实体类型进行数据路由分发(与消息类型切换节点相似)
脚本过滤器节点(script)
使用javascript条件进行消息过滤(消息msg,metadata消息元数据,msgType消息类型)
交换节点(switch)
将传入消息路由到一个或多个输出链,节点执行已配置的JavaScript函数。
GPS地理围栏过滤器节点(gps genfencing filter)
根据传入的消息或者元数据,检查经纬度是否在配置的地理围栏之内(圆、方)
属性集
用于更新传入消息的元数据
节点类型
消息发起方用户属性(customer attributes)
将消息发起方的用户属性或者遥测数据加入metadata
消息发起方用户详情(customer details)
将消息发起方的用户详情数据(客户自定义属性)加入metadata
设备属性(device attributes)
将消息发起方的设备属性或者遥测数据加入metadata
发起者属性(originator attribute)
将消息发起方的(客户端属性\服务端属性|共享属性)或者遥测数据加入metadata
发起方字段(originator field)
发起方的字段加入metadata
发起方遥测(originator telemetry)
发起方的遥测数据加入metadata
相关联属性值(related attributes)
节点使用配置的查询查找“消息发起者”实体的“相关实体”,并将“属性”或“最新遥测”值添加到“消息元数据”中。
租户属性(tenant attribute)
将消息发起方的租户属性加入metadata
租户详情(tenant detail)
将消息发起方的租户详情加入metadata中
转换节点
用户更改创立的消息字段,比如,发起方、类型、有效负载,元数据
节点类型
变更发起者(change originator)
作用:将消息更改为其他发起者,此消息可以作为你另一个实体的消息处理(比如设备发的消息,可以更改为客户、租户、实体进行处理)比如将设备遥测数据复制到更高级别如租户、资产级别中
脚本转换节点(script)
作用:修改消息内容(msg(消息负载),msgType(消息类型),metadata(元数据)),可增加,修改
转换到电子邮件节点(to email)
通过使用从消息元数据派生的值填充电子邮件字段,将消息转换为电子邮件消息。设置“ SEND_EMAIL”输出消息类型,以后可以被“ 发送电子邮件节点”接受。可以将所有电子邮件字段配置为使用元数据中的值。
动作阶段
根据传入的消息执行各种动作
节点类型
将消息发起实体分配给客户节点(assign to customer)
发起方实体(消息发起者类型):资产,设备,实体视图,仪表板。
方法:通过客户名称模式查找目标客户,然后将发起者实体分配给该客户
将消息发起方取消分配给客户(unassign from customer )
create alarm(创建告警)
组成
告警类型
告警描述
用法:通过脚本过滤后,对满足条件的消息进行告警规则设置
clear alarm(清除告警)
delay(延迟消息)
create relation(创建关系)
delete relation(删除关系)
log(创建日志)
save attribute 保存属性
save timeseries 保存时序数据
rpc call reply
rpc call request
定时生成消息(generator)生成具有可配置周期的消息。JavaScript函数用于生成消息。
消息生成频率(以秒为单位)
消息发起者
JavaScript函数将生成实际的消息。
prevMsg -是先前生成的消息有效负载。
prevMetadata -是先前生成的消息元数据。
prevMsgType -是先前生成的消息类型。
message count
gps geofencing events
synchronized end
synchronized start
copy to view
外部节点
用于与外部系统进行交互
节点类型
AWS SNS 节点
作用:将消息发布到aws sns(亚马逊简单消息通知服务,是一种发布\订阅模式的消息收发服务)
AWS SNS 开发指南:https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
服务需付费使用
AWS SQS节点
作用:将消息发布到aws SQS (简单消息队列服务),支持先进先出(FIFO,高吞吐,仅传送一次,先进先出)、standard模式(无限吞吐,至少传一次,最大努力排序(出队顺序优化))
AWS SQS 开发指南:https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
服务需付费使用
kafka节点
作用:将消息发送到kafka的topic上
开源免费
配置属性
主题模式(topic) -可以是静态字符串,也可以是使用消息元数据属性解析的模式。例如${deviceType}
引导服务器(bootstrap servers) -用逗号分隔的kafka代理列表。
自动重试次数(Automatically retry times ) -如果连接失败,尝试重发消息的次数。
产生批处理大小(Produces batch size) -批处理大小(以字节为单位),用于对具有相同分区的消息进行分组。
本地缓存时间(Time to buffer locally)时间在本地进行缓冲以毫秒最大本地缓冲窗口时间
客户端缓冲区最大大小(Client buffer max size) -用于发送消息的最大缓冲区大小(以字节为单位)。
确认数(Number of acknowledgments)-认为请求完成之前节点需要接收的确认数。
密钥序列化器(Key serializer ) -默认情况下为org.apache.kafka.common.serialization.StringSerializer
值序列化器(Value serializer) -默认情况下为org.apache.kafka.common.serialization.StringSerializer
其他属性 (Other properties)-可以为kafka代理连接提供任何其他附加属性。
发布的主体 -节点将向Kafka主题发送完整的消息有效负载。如果需要,可以将规则链配置为使用转换节点链将正确的有效载荷发送到Kafka。
来自此节点的出站消息将在消息元数据中包含响应偏移量,分区和主题属性。原始消息有效负载,类型和发起者将不会更改。
MQTT节点
作用:使用最少一次策略,将传入消息有效负载发送到已配置好的MQTT代理服务topic上
RabbitMQ节点
作用:将传入的消息有效负载发布到RabbitMq上
REST API 调用节点
作用:使用REST API 调用到外部REST服务器。
send email 发送电子邮件节点
作用:节点使用已配置的邮件服务器发送传入消息。
gcp pubsub 谷歌云平台的发布订阅服务
Rule Chain(规则链)
是什么?规则链是规则节点及其关系的逻辑组
接收来自节点的出站消息将其发送至下一个节点。
用法:租户管理员可以定义一个“ 根”规则链,还可以定义多个其他规则链。根规则链处理所有传入的消息,并将其转发到其他规则链以进行其他处理。其他规则链也可以将消息转发到不同的规则链。
0 条评论
下一页