TB规则引擎基于两个主要组件:actor模型和消息队列

Actor model

Actor模型

Actor模型支持在服务器端API调用的情况下,对设备传输层的消息进行高性能并发处理。TB使用Akka作为Actor系统实现。与规则引擎相关的主要Actors有两个:规则链Actor和规则节点Actor。

规则链Actor

规则链Actor负责规则节点配置、路由规则节点之间的消息以及处理队列put和ack命令。每个规则链Actor表示由用户配置的单个规则链。规则链Actor是多个规则节点Actor的父元素。

规则节点Actor

规则节点Actor负责处理传入的消息。消息处理逻辑高度可定制。系统内置RuleNodes实现,您还可以开发自定义规则节点实现。更多细节请参阅自定义指南。

每个传入消息都被推到队列中,然后由规则引擎进行处理。例如,设备通过MQTT推送遥测消息后,遥测消息将会被存储到消息队列中。队列确认推送操作之后,ThingsBoard将消息传送确认返回至设备。 规则引擎开始处理传入消息时,消息将被确认并从队列中删除。 在服务器重新启动或规则链重新配置期间,存储在队列中的尚未被确认的所有消息会被重新处理。 消息队列还具有节流功能,可以限制每个租户在单个时间点上处理的消息数量。

消息队列

规则引擎在两种情况下确认消息:

  • 规则链中的最后一个规则节点处理消息

  • 规则节点将消息推送到多个其他规则节点。在这种情况下,规则链actor将传入消息复制到多个传出消息。发送的消息将被单独推送到队列中,传入消息将被确认。

内存消息队列

内存消息队列是community edition中可用的默认消息队列。这种类型的队列允许在队列中配置最大消息计数。在一定时间内未被确认的消息将会被从队列中删除。

基于Cassandra的消息队列

在Cassandra DB中存储所有数据的持久消息队列。TB只提供专业版Cassandra DB。

基于PostgreSQL的消息队列

持久消息队列存储PostgreSQL DB中的所有数据。TB只提供专业版PostgreSQL。

自定义消息队列实现

因为项目是开源的,所以你可以实现自己的消息队列

自定义指南

实现TbNode接口来创建新的规则节点。

package org.thingsboard.rule.engine.api;

...

public interface TbNode {

    void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;

    void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;

    void destroy();

}

使用以下注释来注释您的实现:

org.thingsboard.rule.engine.api.RuleNode

results matching ""

    No results matching ""