diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 604626a050..98610082f9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -17,6 +17,9 @@ package org.thingsboard.server.actors.ruleChain; import akka.actor.ActorRef; import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.EventLoopGroup; import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.api.ListeningExecutor; @@ -30,6 +33,11 @@ import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -38,9 +46,11 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.cluster.ServerType; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; @@ -69,6 +79,8 @@ import java.util.function.Consumer; */ class DefaultTbContext implements TbContext { + public final static ObjectMapper mapper = new ObjectMapper(); + private final ActorSystemContext mainCtx; private final RuleNodeCtx nodeCtx; @@ -138,6 +150,48 @@ class DefaultTbContext implements TbContext { return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId()); } + @Override + public void sendTbMsgToRuleEngine(TbMsg msg) { + mainCtx.getActorService().onMsg(new SendToClusterMsg(msg.getOriginator(), new ServiceToRuleEngineMsg(getTenantId(), msg))); + } + + public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) { + try { + ObjectNode entityNode = mapper.valueToTree(customer); + return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, customer.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); + } catch (JsonProcessingException | IllegalArgumentException e) { + throw new RuntimeException("Failed to process customer created msg: " + e); + } + } + + public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) { + try { + ObjectNode entityNode = mapper.valueToTree(device); + return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, device.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); + } catch (JsonProcessingException | IllegalArgumentException e) { + throw new RuntimeException("Failed to process device created msg: " + e); + } + } + + public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) { + try { + ObjectNode entityNode = mapper.valueToTree(asset); + return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, asset.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); + } catch (JsonProcessingException | IllegalArgumentException e) { + throw new RuntimeException("Failed to process asset created msg: " + e); + } + } + + public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) { + try { + ObjectNode entityNode = mapper.valueToTree(alarm); + return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, alarm.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L); + } catch (JsonProcessingException | IllegalArgumentException e) { + throw new RuntimeException("Failed to process alarm created msg: " + e); + } + } + + @Override public RuleNodeId getSelfId() { return nodeCtx.getSelf().getId(); @@ -305,4 +359,10 @@ class DefaultTbContext implements TbContext { return mainCtx.getCassandraBufferedRateExecutor(); } + private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) { + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("ruleNodeId", ruleNodeId.toString()); + return metaData; + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java index 49b30157e3..64233ccb67 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.id.TenantId; @Data @EqualsAndHashCode(callSuper = true) diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index d3871e383a..e401a1c846 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -16,6 +16,10 @@ package org.thingsboard.rule.engine.api; import io.netty.channel.EventLoopGroup; +import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; @@ -38,7 +42,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; /** * Created by ashvayka on 13.01.18. @@ -59,10 +62,20 @@ public interface TbContext { void updateSelf(RuleNode self); + void sendTbMsgToRuleEngine(TbMsg msg); + TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data); TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data); + TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId); + + TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId); + + TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId); + + TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId); + RuleNodeId getSelfId(); TenantId getTenantId(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java index d8ab52791a..6132ad0df3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java @@ -19,7 +19,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.api.*; +import org.thingsboard.rule.engine.api.ScriptEngine; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -56,6 +60,7 @@ public abstract class TbAbstractAlarmNode