Browse Source

add-entity-created-events-in-rule-engine (#1696)

* add-entity-created-events-in-rule-engine

* optimize imports

* update

* fix typo

* update actionEventsFromRuleEngine
pull/1766/head
ShvaykaD 7 years ago
committed by Andrew Shvayka
parent
commit
2cf410eb1e
  1. 60
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  2. 1
      common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
  3. 15
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  4. 7
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java
  5. 5
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java
  6. 5
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java

60
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -17,6 +17,9 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.channel.EventLoopGroup;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.ListeningExecutor;
@ -30,6 +33,11 @@ import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@ -38,9 +46,11 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
@ -69,6 +79,8 @@ import java.util.function.Consumer;
*/
class DefaultTbContext implements TbContext {
public final static ObjectMapper mapper = new ObjectMapper();
private final ActorSystemContext mainCtx;
private final RuleNodeCtx nodeCtx;
@ -138,6 +150,48 @@ class DefaultTbContext implements TbContext {
return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
}
@Override
public void sendTbMsgToRuleEngine(TbMsg msg) {
mainCtx.getActorService().onMsg(new SendToClusterMsg(msg.getOriginator(), new ServiceToRuleEngineMsg(getTenantId(), msg)));
}
public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) {
try {
ObjectNode entityNode = mapper.valueToTree(customer);
return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, customer.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process customer created msg: " + e);
}
}
public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) {
try {
ObjectNode entityNode = mapper.valueToTree(device);
return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, device.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process device created msg: " + e);
}
}
public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) {
try {
ObjectNode entityNode = mapper.valueToTree(asset);
return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, asset.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process asset created msg: " + e);
}
}
public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) {
try {
ObjectNode entityNode = mapper.valueToTree(alarm);
return new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, alarm.getId(), getActionMetaData(ruleNodeId), mapper.writeValueAsString(entityNode), null, null, 0L);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process alarm created msg: " + e);
}
}
@Override
public RuleNodeId getSelfId() {
return nodeCtx.getSelf().getId();
@ -305,4 +359,10 @@ class DefaultTbContext implements TbContext {
return mainCtx.getCassandraBufferedRateExecutor();
}
private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ruleNodeId", ruleNodeId.toString());
return metaData;
}
}

1
common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java

@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
@Data
@EqualsAndHashCode(callSuper = true)

15
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -16,6 +16,10 @@
package org.thingsboard.rule.engine.api;
import io.netty.channel.EventLoopGroup;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
@ -38,7 +42,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
* Created by ashvayka on 13.01.18.
@ -59,10 +62,20 @@ public interface TbContext {
void updateSelf(RuleNode self);
void sendTbMsgToRuleEngine(TbMsg msg);
TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);
TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId);
TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId);
TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId);
TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId);
RuleNodeId getSelfId();
TenantId getTenantId();

7
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java

@ -19,7 +19,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -56,6 +60,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
ctx.tellNext(msg, "False");
} else if (alarmResult.isCreated) {
ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Created");
ctx.sendTbMsgToRuleEngine(ctx.alarmCreatedMsg(alarmResult.alarm, ctx.getSelfId()));
} else if (alarmResult.isUpdated) {
ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated");
} else if (alarmResult.isCleared) {

5
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractCustomerActionNode.java

@ -121,10 +121,13 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
Customer newCustomer = new Customer();
newCustomer.setTitle(key.getCustomerTitle());
newCustomer.setTenantId(ctx.getTenantId());
return Optional.of(service.saveCustomer(newCustomer).getId());
Customer savedCustomer = service.saveCustomer(newCustomer);
ctx.sendTbMsgToRuleEngine(ctx.customerCreatedMsg(savedCustomer, ctx.getSelfId()));
return Optional.of(savedCustomer.getId());
}
return Optional.empty();
}
}
}

5
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java

@ -137,7 +137,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
}
}
protected String processPattern(TbMsg msg, String pattern){
protected String processPattern(TbMsg msg, String pattern) {
return TbNodeUtils.processPattern(pattern, msg.getMetaData());
}
@ -187,6 +187,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
newDevice.setType(entitykey.getType());
newDevice.setTenantId(ctx.getTenantId());
Device savedDevice = deviceService.saveDevice(newDevice);
ctx.sendTbMsgToRuleEngine(ctx.deviceCreatedMsg(savedDevice, ctx.getSelfId()));
targetEntity.setEntityId(savedDevice.getId());
}
break;
@ -201,6 +202,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
newAsset.setType(entitykey.getType());
newAsset.setTenantId(ctx.getTenantId());
Asset savedAsset = assetService.saveAsset(newAsset);
ctx.sendTbMsgToRuleEngine(ctx.assetCreatedMsg(savedAsset, ctx.getSelfId()));
targetEntity.setEntityId(savedAsset.getId());
}
break;
@ -214,6 +216,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
newCustomer.setTitle(entitykey.getEntityName());
newCustomer.setTenantId(ctx.getTenantId());
Customer savedCustomer = customerService.saveCustomer(newCustomer);
ctx.sendTbMsgToRuleEngine(ctx.customerCreatedMsg(savedCustomer, ctx.getSelfId()));
targetEntity.setEntityId(savedCustomer.getId());
}
break;

Loading…
Cancel
Save