Browse Source

Merge branch 'master' into develop/3.3

pull/4143/head
Igor Kulikov 5 years ago
parent
commit
67aba9e4b1
  1. 4
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  2. 23
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 13
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
  4. 30
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
  5. 18
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
  6. 7
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
  7. 2
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
  8. 34
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
  9. 17
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfMsg.java
  10. 42
      application/src/main/java/org/thingsboard/server/actors/ruleChain/TbToRuleNodeActorMsg.java
  11. 8
      application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java
  12. 2
      application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistTick.java
  13. 4
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  14. 5
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
  15. 6
      application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java
  16. 2
      application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
  17. 2
      application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
  18. 2
      common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java
  19. 2
      common/actor/src/main/java/org/thingsboard/server/actors/TbActorException.java
  20. 20
      common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java
  21. 5
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  22. 8
      common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java
  23. 22
      common/message/src/main/java/org/thingsboard/server/common/msg/TbActorStopReason.java
  24. 25
      common/message/src/main/java/org/thingsboard/server/common/msg/TbRuleEngineActorMsg.java
  25. 38
      common/message/src/main/java/org/thingsboard/server/common/msg/queue/QueueToRuleEngineMsg.java
  26. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeException.java
  27. 12
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java
  28. 2
      dao/src/test/resources/nosql-test.properties
  29. 4
      dao/src/test/resources/sql-test.properties
  30. 5
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  31. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceCredentialsUpdateNotificationMsg.java
  32. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java
  33. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java
  34. 19
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
  35. 27
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java
  36. 11
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java
  37. 23
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java
  38. 25
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DynamicPredicateValueCtx.java
  39. 74
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DynamicPredicateValueCtxImpl.java
  40. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/ProfileState.java
  41. 4
      rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js
  42. 257
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java
  43. 2
      ui-ngx/src/app/modules/home/components/filter/filter-predicate-value.component.ts
  44. 4
      ui-ngx/src/assets/locale/locale.constant-el_GR.json
  45. 4
      ui-ngx/src/assets/locale/locale.constant-uk_UA.json

4
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -134,12 +134,12 @@ public class AppActor extends ContextAwareActor {
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
} else {
msg.getTbMsg().getCallback().onSuccess();
msg.getMsg().getCallback().onSuccess();
}
}
}

23
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
@ -34,7 +33,6 @@ import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
@ -90,10 +88,12 @@ class DefaultTbContext implements TbContext {
public final static ObjectMapper mapper = new ObjectMapper();
private final ActorSystemContext mainCtx;
private final String ruleChainName;
private final RuleNodeCtx nodeCtx;
public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) {
public DefaultTbContext(ActorSystemContext mainCtx, String ruleChainName, RuleNodeCtx nodeCtx) {
this.mainCtx = mainCtx;
this.ruleChainName = ruleChainName;
this.nodeCtx = nodeCtx;
}
@ -117,13 +117,13 @@ class DefaultTbContext implements TbContext {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
@Override
public void tellSelf(TbMsg msg, long delayMs) {
//TODO: add persistence layer
scheduleMsgWithDelay(new RuleNodeToSelfMsg(msg), delayMs, nodeCtx.getSelfActor());
scheduleMsgWithDelay(new RuleNodeToSelfMsg(this, msg), delayMs, nodeCtx.getSelfActor());
}
@Override
@ -254,7 +254,8 @@ class DefaultTbContext implements TbContext {
} else {
failureMessage = null;
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(),
nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
msg, failureMessage));
}
@ -301,6 +302,16 @@ class DefaultTbContext implements TbContext {
return nodeCtx.getSelf().getId();
}
@Override
public RuleNode getSelf() {
return nodeCtx.getSelf();
}
@Override
public String getRuleChainName() {
return ruleChainName;
}
@Override
public TenantId getTenantId() {
return nodeCtx.getTenantId();

13
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java

@ -23,7 +23,6 @@ import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -197,11 +196,11 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getTbMsg();
TbMsg msg = envelope.getMsg();
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
try {
checkActive(envelope.getTbMsg());
checkActive(envelope.getMsg());
RuleNodeId targetId = msg.getRuleNodeId();
RuleNodeCtx targetCtx;
if (targetId == null) {
@ -218,12 +217,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
msg.getCallback().onSuccess();
}
} catch (RuleNodeException rne) {
envelope.getTbMsg().getCallback().onFailure(rne);
envelope.getMsg().getCallback().onFailure(rne);
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
envelope.getMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
}
} else {
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
onTellNext(envelope.getMsg(), envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
@ -335,7 +334,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
if (nodeCtx != null) {
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType));
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
} else {
log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));

30
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java

@ -15,24 +15,44 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
@EqualsAndHashCode(callSuper = true)
@ToString
public final class RuleChainToRuleChainMsg extends TbRuleEngineActorMsg implements RuleChainAwareMsg {
@Getter
private final RuleChainId target;
@Getter
private final RuleChainId source;
private final TbMsg msg;
@Getter
private final String fromRelationType;
public RuleChainToRuleChainMsg(RuleChainId target, RuleChainId source, TbMsg tbMsg, String fromRelationType) {
super(tbMsg);
this.target = target;
this.source = source;
this.fromRelationType = fromRelationType;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", target.getId()) : String.format("Failed to initialize rule chain [%s]!", target.getId());
msg.getCallback().onFailure(new RuleEngineException(message));
}
@Override
public RuleChainId getRuleChainId() {
return target;

18
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java

@ -15,22 +15,28 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleChainToRuleNodeMsg implements TbActorMsg {
@EqualsAndHashCode(callSuper = true)
@ToString
final class RuleChainToRuleNodeMsg extends TbToRuleNodeActorMsg {
private final TbContext ctx;
private final TbMsg msg;
@Getter
private final String fromRelationType;
public RuleChainToRuleNodeMsg(TbContext ctx, TbMsg tbMsg, String fromRelationType) {
super(ctx, tbMsg);
this.fromRelationType = fromRelationType;
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_TO_RULE_MSG;

7
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java

@ -59,9 +59,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
case RULE_CHAIN_TO_RULE_MSG:
onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
break;
case RULE_TO_SELF_ERROR_MSG:
onRuleNodeToSelfErrorMsg((RuleNodeToSelfErrorMsg) msg);
break;
case RULE_TO_SELF_MSG:
onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);
break;
@ -101,10 +98,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
}
private void onRuleNodeToSelfErrorMsg(RuleNodeToSelfErrorMsg msg) {
logAndPersist("onRuleMsg", ActorSystemContext.toException(msg.getError()));
}
public static class ActorCreator extends ContextBasedCreator {
private final TenantId tenantId;

2
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java

@ -54,7 +54,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
this.ruleChainName = ruleChainName;
this.self = self;
this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
this.defaultCtx = new DefaultTbContext(systemContext, new RuleNodeCtx(tenantId, parent, self, ruleNode));
this.defaultCtx = new DefaultTbContext(systemContext, ruleChainName, new RuleNodeCtx(tenantId, parent, self, ruleNode));
this.info = new RuleNodeInfo(ruleNodeId, ruleChainName, ruleNode != null ? ruleNode.getName() : "Unknown");
}

34
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java

@ -15,11 +15,16 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import java.io.Serializable;
import java.util.Set;
@ -27,15 +32,34 @@ import java.util.Set;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
class RuleNodeToRuleChainTellNextMsg implements TbActorMsg, Serializable {
@EqualsAndHashCode(callSuper = true)
@ToString
class RuleNodeToRuleChainTellNextMsg extends TbRuleEngineActorMsg implements Serializable {
private static final long serialVersionUID = 4577026446412871820L;
@Getter
private final RuleChainId ruleChainId;
@Getter
private final RuleNodeId originator;
@Getter
private final Set<String> relationTypes;
private final TbMsg msg;
@Getter
private final String failureMessage;
public RuleNodeToRuleChainTellNextMsg(RuleChainId ruleChainId, RuleNodeId originator, Set<String> relationTypes, TbMsg tbMsg, String failureMessage) {
super(tbMsg);
this.ruleChainId = ruleChainId;
this.originator = originator;
this.relationTypes = relationTypes;
this.failureMessage = failureMessage;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", ruleChainId.getId()) : String.format("Failed to initialize rule chain [%s]!", ruleChainId.getId());
msg.getCallback().onFailure(new RuleEngineException(message));
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG;

17
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfMsg.java

@ -15,18 +15,25 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleNodeToSelfMsg implements TbActorMsg {
@EqualsAndHashCode(callSuper = true)
@ToString
final class RuleNodeToSelfMsg extends TbToRuleNodeActorMsg {
private final TbMsg msg;
public RuleNodeToSelfMsg(TbContext ctx, TbMsg tbMsg) {
super(ctx, tbMsg);
}
@Override
public MsgType getMsgType() {

42
application/src/main/java/org/thingsboard/server/actors/ruleChain/TbToRuleNodeActorMsg.java

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
@EqualsAndHashCode(callSuper = true)
public abstract class TbToRuleNodeActorMsg extends TbRuleEngineActorMsg {
@Getter
private final TbContext ctx;
public TbToRuleNodeActorMsg(TbContext ctx, TbMsg tbMsg) {
super(tbMsg);
this.ctx = ctx;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message = reason == TbActorStopReason.STOPPED ? "Rule node stopped" : "Failed to initialize rule node!";
msg.getCallback().onFailure(new RuleNodeException(message, ctx.getRuleChainName(), ctx.getSelf()));
}
}

8
application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java

@ -28,10 +28,10 @@ import org.thingsboard.server.common.msg.TbActorMsg;
@ToString
public final class StatsPersistMsg implements TbActorMsg {
private long messagesProcessed;
private long errorsOccurred;
private TenantId tenantId;
private EntityId entityId;
private final long messagesProcessed;
private final long errorsOccurred;
private final TenantId tenantId;
private final EntityId entityId;
@Override
public MsgType getMsgType() {

2
application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistTick.java

@ -18,7 +18,7 @@ package org.thingsboard.server.actors.stats;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
public final class StatsPersistTick implements TbActorMsg{
public final class StatsPersistTick implements TbActorMsg {
@Override
public MsgType getMsgType() {
return MsgType.STATS_PERSIST_TICK_MSG;

4
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -119,7 +119,7 @@ public class TenantActor extends RuleChainManagerActor {
log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
queueMsg.getTbMsg().getCallback().onSuccess();
queueMsg.getMsg().getCallback().onSuccess();
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
transportMsg.getCallback().onSuccess();
@ -177,7 +177,7 @@ public class TenantActor extends RuleChainManagerActor {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getTbMsg();
TbMsg tbMsg = msg.getMsg();
if (apiUsageState.isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {

5
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

@ -181,7 +181,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
new TbMsgPackCallback(id, tenantId, ctx);
try {
if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
if (!toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
@ -209,6 +209,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (statsEnabled) {
stats.log(result, decision.isCommit());
}
ctx.cleanup();
if (decision.isCommit()) {
submitStrategy.stop();
break;

6
application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java

@ -147,4 +147,10 @@ public class TbMsgPackProcessingContext {
.forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel()));
}
}
public void cleanup() {
pendingMap.clear();
successMap.clear();
failedMap.clear();
}
}

2
application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java

@ -31,6 +31,8 @@ import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
@RequiredArgsConstructor
public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg {
private static final long serialVersionUID = -8592877558138716589L;
@Getter
private final String serviceId;
@Getter

2
application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java

@ -34,6 +34,8 @@ import java.util.UUID;
@Data
public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable {
private static final long serialVersionUID = 7191333353202935941L;
private final TenantId tenantId;
private final DeviceId deviceId;
private final TransportToDeviceActorMsg msg;

2
common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java

@ -30,7 +30,7 @@ public interface TbActor {
}
default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
return InitFailureStrategy.retryWithDelay(5000 * attempt);
return InitFailureStrategy.retryWithDelay(5000L * attempt);
}
default ProcessFailureStrategy onProcessFailure(Throwable t) {

2
common/actor/src/main/java/org/thingsboard/server/actors/TbActorException.java

@ -17,6 +17,8 @@ package org.thingsboard.server.actors;
public class TbActorException extends Exception {
private static final long serialVersionUID = 8209771144711980882L;
public TbActorException(String message, Throwable cause) {
super(message, cause);
}

20
common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java

@ -18,6 +18,7 @@ package org.thingsboard.server.actors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -49,6 +50,7 @@ public final class TbActorMailbox implements TbActorCtx {
private final AtomicBoolean busy = new AtomicBoolean(FREE);
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
private final AtomicBoolean destroyInProgress = new AtomicBoolean();
private volatile TbActorStopReason stopReason;
public void initActor() {
dispatcher.getExecutor().execute(() -> tryInit(1));
@ -70,6 +72,7 @@ public final class TbActorMailbox implements TbActorCtx {
InitFailureStrategy strategy = actor.onInitFailure(attempt, t);
if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
stopReason = TbActorStopReason.INIT_FAILED;
system.stop(selfId);
} else if (strategy.getRetryDelay() > 0) {
log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay());
@ -84,12 +87,16 @@ public final class TbActorMailbox implements TbActorCtx {
}
private void enqueue(TbActorMsg msg, boolean highPriority) {
if (highPriority) {
highPriorityMsgs.add(msg);
if (!destroyInProgress.get()) {
if (highPriority) {
highPriorityMsgs.add(msg);
} else {
normalPriorityMsgs.add(msg);
}
tryProcessQueue(true);
} else {
normalPriorityMsgs.add(msg);
msg.onTbActorStopped(stopReason);
}
tryProcessQueue(true);
}
private void tryProcessQueue(boolean newMsg) {
@ -180,11 +187,16 @@ public final class TbActorMailbox implements TbActorCtx {
}
public void destroy() {
if (stopReason == null) {
stopReason = TbActorStopReason.STOPPED;
}
destroyInProgress.set(true);
dispatcher.getExecutor().execute(() -> {
try {
ready.set(NOT_READY);
actor.destroy();
highPriorityMsgs.forEach(msg -> msg.onTbActorStopped(stopReason));
normalPriorityMsgs.forEach(msg -> msg.onTbActorStopped(stopReason));
} catch (Throwable t) {
log.warn("[{}] Failed to destroy actor: {}", selfId, t);
}

5
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -66,11 +66,6 @@ public enum MsgType {
*/
REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
/**
* Message that is sent by RuleActor implementation to RuleActor itself to log the error.
*/
RULE_TO_SELF_ERROR_MSG,
/**
* Message that is sent by RuleActor implementation to RuleActor itself to process the message.
*/

8
common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java

@ -22,4 +22,12 @@ public interface TbActorMsg {
MsgType getMsgType();
/**
* Executed when the target TbActor is stopped or destroyed.
* For example, rule node failed to initialize or removed from rule chain.
* Implementation should cleanup the resources.
*/
default void onTbActorStopped(TbActorStopReason reason) {
}
}

22
common/message/src/main/java/org/thingsboard/server/common/msg/TbActorStopReason.java

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
public enum TbActorStopReason {
INIT_FAILED, STOPPED
}

25
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java → common/message/src/main/java/org/thingsboard/server/common/msg/TbRuleEngineActorMsg.java

@ -13,25 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
package org.thingsboard.server.common.msg;
import lombok.Data;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import lombok.EqualsAndHashCode;
import lombok.Getter;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleNodeToSelfErrorMsg implements TbActorMsg {
@EqualsAndHashCode
public abstract class TbRuleEngineActorMsg implements TbActorMsg {
private final TbMsg msg;
private final Throwable error;
@Getter
protected final TbMsg msg;
@Override
public MsgType getMsgType() {
return MsgType.RULE_TO_SELF_ERROR_MSG;
public TbRuleEngineActorMsg(TbMsg msg) {
this.msg = msg;
}
}

38
common/message/src/main/java/org/thingsboard/server/common/msg/queue/QueueToRuleEngineMsg.java

@ -15,32 +15,58 @@
*/
package org.thingsboard.server.common.msg.queue;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import java.io.Serializable;
import java.util.Set;
/**
* Created by ashvayka on 15.03.18.
*/
@Data
public final class QueueToRuleEngineMsg implements TbActorMsg {
@ToString
@EqualsAndHashCode(callSuper = true)
public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg {
@Getter
private final TenantId tenantId;
private final TbMsg tbMsg;
@Getter
private final Set<String> relationTypes;
@Getter
private final String failureMessage;
public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) {
super(tbMsg);
this.tenantId = tenantId;
this.relationTypes = relationTypes;
this.failureMessage = failureMessage;
}
@Override
public MsgType getMsgType() {
return MsgType.QUEUE_TO_RULE_ENGINE_MSG;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message;
if (msg.getRuleChainId() != null) {
message = reason == TbActorStopReason.STOPPED ?
String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) :
String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId());
} else {
message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!";
}
msg.getCallback().onFailure(new RuleEngineException(message));
}
public boolean isTellNext() {
return relationTypes != null && !relationTypes.isEmpty();
}
}

4
common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeException.java

@ -24,6 +24,9 @@ import org.thingsboard.server.common.data.rule.RuleNode;
@Slf4j
public class RuleNodeException extends RuleEngineException {
private static final long serialVersionUID = -1776681087370749776L;
@Getter
private final String ruleChainName;
@Getter
@ -33,6 +36,7 @@ public class RuleNodeException extends RuleEngineException {
@Getter
private final RuleNodeId ruleNodeId;
public RuleNodeException(String message, String ruleChainName, RuleNode ruleNode) {
super(message);
this.ruleChainName = ruleChainName;

12
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java

@ -22,6 +22,10 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -107,8 +111,8 @@ public class TbKafkaSettings {
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return props;
}
@ -120,8 +124,8 @@ public class TbKafkaSettings {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return props;
}

2
dao/src/test/resources/nosql-test.properties

@ -6,6 +6,8 @@ sql.ts_inserts_fixed_thread_pool_size=10
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
spring.jpa.properties.hibernate.order_by.default_null_ordering=last
spring.jpa.properties.hibernate.jdbc.log.warnings=false
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=none
spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect

4
dao/src/test/resources/sql-test.properties

@ -7,6 +7,8 @@ sql.ts_key_value_partitioning=MONTHS
#
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
spring.jpa.properties.hibernate.order_by.default_null_ordering=last
spring.jpa.properties.hibernate.jdbc.log.warnings=false
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=validate
spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect
@ -49,4 +51,4 @@ queue.rule-engine.queues[0].pack-processing-timeout=3000
queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES
queue.rule-engine.queues[0].submit-strategy.type=BURST
sql.log_entity_queries=true
sql.log_entity_queries=true

5
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -161,6 +162,10 @@ public interface TbContext {
RuleNodeId getSelfId();
RuleNode getSelf();
String getRuleChainName();
TenantId getTenantId();
AttributesService getAttributesService();

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceCredentialsUpdateNotificationMsg.java

@ -33,6 +33,8 @@ import java.util.Set;
@Data
public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg {
private static final long serialVersionUID = -3956907402411126990L;
private final TenantId tenantId;
private final DeviceId deviceId;

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java

@ -24,6 +24,9 @@ import org.thingsboard.server.common.msg.MsgType;
@Data
@AllArgsConstructor
public class DeviceNameOrTypeUpdateMsg implements ToDeviceActorNotificationMsg {
private static final long serialVersionUID = -5738949227650536685L;
private final TenantId tenantId;
private final DeviceId deviceId;
private final String deviceName;

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java

@ -79,7 +79,7 @@ public class TbNodeUtils {
}
}
if (jsonNode != null && !jsonNode.isObject() && !jsonNode.isArray()) {
if (jsonNode != null && jsonNode.isValueNode()) {
result = result.replace(String.format(DATA_VARIABLE_TEMPLATE, group), jsonNode.asText());
}
}

19
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java

@ -17,11 +17,13 @@ package org.thingsboard.rule.engine.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
@ -73,8 +75,8 @@ public class TbKafkaNode implements TbNode {
Properties properties = new Properties();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getValueSerializer()));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKafkaSerializerClass(config.getKeySerializer()));
properties.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
@ -92,6 +94,19 @@ public class TbKafkaNode implements TbNode {
}
}
private Class<?> getKafkaSerializerClass(String serializerClassName) {
Class<?> serializerClass = null;
if (!StringUtils.isEmpty(serializerClassName)) {
try {
serializerClass = Class.forName(serializerClassName);
} catch (ClassNotFoundException e) {}
}
if (serializerClass == null) {
serializerClass = StringSerializer.class;
}
return serializerClass;
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);

27
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java

@ -55,8 +55,9 @@ class AlarmRuleState {
private final Set<EntityKey> entityKeys;
private PersistedAlarmRuleState state;
private boolean updateFlag;
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, Set<EntityKey> entityKeys, PersistedAlarmRuleState state) {
AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, Set<EntityKey> entityKeys, PersistedAlarmRuleState state, DynamicPredicateValueCtx dynamicPredicateValueCtx) {
this.severity = severity;
this.alarmRule = alarmRule;
this.entityKeys = entityKeys;
@ -80,6 +81,7 @@ class AlarmRuleState {
}
this.requiredDurationInMs = requiredDurationInMs;
this.requiredRepeats = requiredRepeats;
this.dynamicPredicateValueCtx = dynamicPredicateValueCtx;
}
public boolean validateTsUpdate(Set<EntityKey> changedKeys) {
@ -385,15 +387,24 @@ class AlarmRuleState {
private <T> EntityKeyValue getDynamicPredicateValue(DataSnapshot data, FilterPredicateValue<T> value) {
EntityKeyValue ekv = null;
if (value.getDynamicValue() != null) {
ekv = data.getValue(new EntityKey(EntityKeyType.ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
if (ekv == null) {
ekv = data.getValue(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
if (ekv == null) {
ekv = data.getValue(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
switch (value.getDynamicValue().getSourceType()) {
case CURRENT_TENANT:
ekv = dynamicPredicateValueCtx.getTenantValue(value.getDynamicValue().getSourceAttribute());
break;
case CURRENT_CUSTOMER:
ekv = dynamicPredicateValueCtx.getCustomerValue(value.getDynamicValue().getSourceAttribute());
break;
case CURRENT_DEVICE:
ekv = data.getValue(new EntityKey(EntityKeyType.ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
if (ekv == null) {
ekv = data.getValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
ekv = data.getValue(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
if (ekv == null) {
ekv = data.getValue(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
if (ekv == null) {
ekv = data.getValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
}
}
}
}
}
}
return ekv;

11
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java

@ -25,11 +25,14 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState;
import org.thingsboard.rule.engine.profile.state.PersistedAlarmState;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.msg.TbMsg;
@ -57,10 +60,12 @@ class AlarmState {
private volatile TbMsgMetaData lastMsgMetaData;
private volatile String lastMsgQueueName;
private volatile DataSnapshot dataSnapshot;
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
AlarmState(ProfileState deviceProfile, EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState) {
AlarmState(ProfileState deviceProfile, EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState, DynamicPredicateValueCtx dynamicPredicateValueCtx) {
this.deviceProfile = deviceProfile;
this.originator = originator;
this.dynamicPredicateValueCtx = dynamicPredicateValueCtx;
this.updateState(alarmDefinition, alarmState);
}
@ -188,12 +193,12 @@ class AlarmState {
}
}
createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule,
deviceProfile.getCreateAlarmKeys(alarm.getId(), severity), ruleState));
deviceProfile.getCreateAlarmKeys(alarm.getId(), severity), ruleState, dynamicPredicateValueCtx));
});
createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal()));
PersistedAlarmRuleState ruleState = alarmState == null ? null : alarmState.getClearRuleState();
if (alarmDefinition.getClearRule() != null) {
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), deviceProfile.getClearAlarmKeys(alarm.getId()), ruleState);
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), deviceProfile.getClearAlarmKeys(alarm.getId()), ruleState, dynamicPredicateValueCtx);
}
}

23
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java

@ -63,11 +63,15 @@ class DeviceState {
private PersistedDeviceState pds;
private DataSnapshot latestValues;
private final ConcurrentMap<String, AlarmState> alarmStates = new ConcurrentHashMap<>();
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
DeviceState(TbContext ctx, TbDeviceProfileNodeConfiguration config, DeviceId deviceId, ProfileState deviceProfile, RuleNodeState state) {
this.persistState = config.isPersistAlarmRulesState();
this.deviceId = deviceId;
this.deviceProfile = deviceProfile;
this.dynamicPredicateValueCtx = new DynamicPredicateValueCtxImpl(ctx.getTenantId(), deviceId, ctx);
if (config.isPersistAlarmRulesState()) {
if (state != null) {
this.state = state;
@ -87,7 +91,7 @@ class DeviceState {
if (pds != null) {
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
alarmStates.computeIfAbsent(alarm.getId(),
a -> new AlarmState(deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
a -> new AlarmState(deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
}
}
}
@ -108,7 +112,7 @@ class DeviceState {
if (alarmStates.containsKey(alarm.getId())) {
alarmStates.get(alarm.getId()).updateState(alarm, getOrInitPersistedAlarmState(alarm));
} else {
alarmStates.putIfAbsent(alarm.getId(), new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
alarmStates.putIfAbsent(alarm.getId(), new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
}
}
}
@ -143,6 +147,9 @@ class DeviceState {
} else if (msg.getType().equals(DataConstants.ALARM_ACK)) {
processAlarmAckNotification(ctx, msg);
} else {
if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED) || msg.getType().equals(DataConstants.ENTITY_UNASSIGNED)) {
dynamicPredicateValueCtx.resetCustomer();
}
ctx.tellSuccess(msg);
}
if (persistState && stateChanged) {
@ -156,7 +163,7 @@ class DeviceState {
Alarm alarmNf = JacksonUtil.fromString(msg.getData(), Alarm.class);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
stateChanged |= alarmState.processAlarmClear(ctx, alarmNf);
}
ctx.tellSuccess(msg);
@ -167,7 +174,7 @@ class DeviceState {
Alarm alarmNf = JacksonUtil.fromString(msg.getData(), Alarm.class);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
alarmState.processAckAlarm(alarmNf);
}
ctx.tellSuccess(msg);
@ -195,7 +202,7 @@ class DeviceState {
keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key)));
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
stateChanged |= alarmState.process(ctx, msg, latestValues, null);
}
}
@ -214,7 +221,7 @@ class DeviceState {
SnapshotUpdate update = merge(latestValues, attributes, scope);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
stateChanged |= alarmState.process(ctx, msg, latestValues, update);
}
}
@ -233,7 +240,7 @@ class DeviceState {
if (update.hasUpdate()) {
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
stateChanged |= alarmState.process(ctx, msg, latestValues, update);
}
}
@ -374,7 +381,7 @@ class DeviceState {
}
}
private EntityKeyValue toEntityValue(KvEntry entry) {
public static EntityKeyValue toEntityValue(KvEntry entry) {
switch (entry.getDataType()) {
case STRING:
return EntityKeyValue.fromString(entry.getStrValue().get());

25
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DynamicPredicateValueCtx.java

@ -0,0 +1,25 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile;
public interface DynamicPredicateValueCtx {
EntityKeyValue getTenantValue(String key);
EntityKeyValue getCustomerValue(String key);
void resetCustomer();
}

74
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DynamicPredicateValueCtxImpl.java

@ -0,0 +1,74 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@Slf4j
public class DynamicPredicateValueCtxImpl implements DynamicPredicateValueCtx {
private final TenantId tenantId;
private CustomerId customerId;
private final DeviceId deviceId;
private final TbContext ctx;
public DynamicPredicateValueCtxImpl(TenantId tenantId, DeviceId deviceId, TbContext ctx) {
this.tenantId = tenantId;
this.deviceId = deviceId;
this.ctx = ctx;
resetCustomer();
}
@Override
public EntityKeyValue getTenantValue(String key) {
return getValue(tenantId, key);
}
@Override
public EntityKeyValue getCustomerValue(String key) {
return customerId == null || customerId.isNullUid() ? null : getValue(customerId, key);
}
@Override
public void resetCustomer() {
Device device = ctx.getDeviceService().findDeviceById(tenantId, deviceId);
if (device != null) {
this.customerId = device.getCustomerId();
}
}
private EntityKeyValue getValue(EntityId entityId, String key) {
try {
Optional<AttributeKvEntry> entry = ctx.getAttributesService().find(tenantId, entityId, DataConstants.SERVER_SCOPE, key).get();
if (entry.isPresent()) {
return DeviceState.toEntityValue(entry.get());
}
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to get attribute by key: {} for {}: [{}]", key, entityId.getEntityType(), entityId.getId());
}
return null;
}
}

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/ProfileState.java

@ -95,7 +95,9 @@ class ProfileState {
case NUMERIC:
case BOOLEAN:
DynamicValue value = ((SimpleKeyFilterPredicate) predicate).getValue().getDynamicValue();
if (value != null && value.getSourceType() == DynamicValueSourceType.CURRENT_DEVICE) {
if (value != null && (value.getSourceType() == DynamicValueSourceType.CURRENT_TENANT ||
value.getSourceType() == DynamicValueSourceType.CURRENT_CUSTOMER ||
value.getSourceType() == DynamicValueSourceType.CURRENT_DEVICE)) {
EntityKey entityKey = new EntityKey(EntityKeyType.ATTRIBUTE, value.getSourceAttribute());
entityKeys.add(entityKey);
ruleKeys.add(entityKey);

4
rule-engine/rule-engine-components/src/main/resources/public/static/rulenode/rulenode-core-config.js

File diff suppressed because one or more lines are too long

257
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.profile;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.AdditionalAnswers;
@ -29,15 +30,22 @@ import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.DynamicValueSourceType;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityKeyValueType;
@ -48,12 +56,19 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
@ -71,9 +86,14 @@ public class TbDeviceProfileNodeTest {
private TimeseriesService timeseriesService;
@Mock
private RuleEngineAlarmService alarmService;
@Mock
private DeviceService deviceService;
@Mock
private AttributesService attributesService;
private TenantId tenantId = new TenantId(UUID.randomUUID());
private DeviceId deviceId = new DeviceId(UUID.randomUUID());
private CustomerId customerId = new CustomerId(UUID.randomUUID());
private DeviceProfileId deviceProfileId = new DeviceProfileId(UUID.randomUUID());
@Test
@ -183,11 +203,248 @@ public class TbDeviceProfileNodeTest {
}
@Test
public void testCurrentDeviceAttributeForDynamicValue() throws Exception {
init();
DeviceProfile deviceProfile = new DeviceProfile();
deviceProfile.setId(deviceProfileId);
DeviceProfileData deviceProfileData = new DeviceProfileData();
Device device = new Device();
device.setId(deviceId);
device.setCustomerId(customerId);
AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey(
EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "greaterAttribute"
);
AttributeKvEntity attributeKvEntity = new AttributeKvEntity();
attributeKvEntity.setId(compositeKey);
attributeKvEntity.setLongValue(30L);
attributeKvEntity.setLastUpdateTs(0L);
AttributeKvEntry entry = attributeKvEntity.toData();
ListenableFuture<List<AttributeKvEntry>> listListenableFutureWithLess =
Futures.immediateFuture(Collections.singletonList(entry));
KeyFilter highTempFilter = new KeyFilter();
highTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
highTempFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate highTemperaturePredicate = new NumericFilterPredicate();
highTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
highTemperaturePredicate.setValue(new FilterPredicateValue<>(
0.0,
null,
new DynamicValue<>(DynamicValueSourceType.CURRENT_DEVICE, "greaterAttribute")
));
highTempFilter.setPredicate(highTemperaturePredicate);
AlarmCondition alarmCondition = new AlarmCondition();
alarmCondition.setCondition(Collections.singletonList(highTempFilter));
AlarmRule alarmRule = new AlarmRule();
alarmRule.setCondition(alarmCondition);
DeviceProfileAlarm dpa = new DeviceProfileAlarm();
dpa.setId("highTemperatureAlarmID");
dpa.setAlarmType("highTemperatureAlarm");
dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
deviceProfileData.setAlarms(Collections.singletonList(dpa));
deviceProfile.setProfileData(deviceProfileData);
Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
.thenReturn(Futures.immediateFuture(Collections.emptyList()));
Mockito.when(alarmService.findLatestByOriginatorAndType(tenantId, deviceId, "highTemperatureAlarm"))
.thenReturn(Futures.immediateFuture(null));
Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any())).thenAnswer(AdditionalAnswers.returnsFirstArg());
Mockito.when(ctx.getAttributesService()).thenReturn(attributesService);
Mockito.when(attributesService.find(eq(tenantId), eq(deviceId), Mockito.anyString(), Mockito.anySet()))
.thenReturn(listListenableFutureWithLess);
TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "");
Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString()))
.thenReturn(theMsg);
ObjectNode data = mapper.createObjectNode();
data.put("temperature", 35);
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
verify(ctx).tellNext(theMsg, "Alarm Created");
verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
}
@Test
public void testCurrentCustomersAttributeForDynamicValue() throws Exception {
init();
DeviceProfile deviceProfile = new DeviceProfile();
deviceProfile.setId(deviceProfileId);
DeviceProfileData deviceProfileData = new DeviceProfileData();
Device device = new Device();
device.setId(deviceId);
device.setCustomerId(customerId);
AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey(
EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "lessAttribute"
);
AttributeKvEntity attributeKvEntity = new AttributeKvEntity();
attributeKvEntity.setId(compositeKey);
attributeKvEntity.setLongValue(30L);
attributeKvEntity.setLastUpdateTs(0L);
AttributeKvEntry entry = attributeKvEntity.toData();
ListenableFuture<List<AttributeKvEntry>> listListenableFutureWithLess =
Futures.immediateFuture(Collections.singletonList(entry));
ListenableFuture<Optional<AttributeKvEntry>> optionalListenableFutureWithLess =
Futures.immediateFuture(Optional.of(entry));
KeyFilter lowTempFilter = new KeyFilter();
lowTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate lowTempPredicate = new NumericFilterPredicate();
lowTempPredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS);
lowTempPredicate.setValue(
new FilterPredicateValue<>(
20.0,
null,
new DynamicValue<>(DynamicValueSourceType.CURRENT_CUSTOMER, "lessAttribute"))
);
lowTempFilter.setPredicate(lowTempPredicate);
AlarmCondition alarmCondition = new AlarmCondition();
alarmCondition.setCondition(Collections.singletonList(lowTempFilter));
AlarmRule alarmRule = new AlarmRule();
alarmRule.setCondition(alarmCondition);
DeviceProfileAlarm dpa = new DeviceProfileAlarm();
dpa.setId("lesstempID");
dpa.setAlarmType("lessTemperatureAlarm");
dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
deviceProfileData.setAlarms(Collections.singletonList(dpa));
deviceProfile.setProfileData(deviceProfileData);
Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
.thenReturn(Futures.immediateFuture(Collections.emptyList()));
Mockito.when(alarmService.findLatestByOriginatorAndType(tenantId, deviceId, "lessTemperatureAlarm"))
.thenReturn(Futures.immediateFuture(null));
Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any())).thenAnswer(AdditionalAnswers.returnsFirstArg());
Mockito.when(ctx.getAttributesService()).thenReturn(attributesService);
Mockito.when(attributesService.find(eq(tenantId), eq(deviceId), Mockito.anyString(), Mockito.anySet()))
.thenReturn(listListenableFutureWithLess);
Mockito.when(ctx.getDeviceService().findDeviceById(tenantId, deviceId))
.thenReturn(device);
Mockito.when(attributesService.find(eq(tenantId), eq(customerId), eq(DataConstants.SERVER_SCOPE), Mockito.anyString()))
.thenReturn(optionalListenableFutureWithLess);
TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "");
Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString()))
.thenReturn(theMsg);
ObjectNode data = mapper.createObjectNode();
data.put("temperature", 25);
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
verify(ctx).tellNext(theMsg, "Alarm Created");
verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
}
@Test
public void testCurrentTenantAttributeForDynamicValue() throws Exception {
init();
DeviceProfile deviceProfile = new DeviceProfile();
DeviceProfileData deviceProfileData = new DeviceProfileData();
Device device = new Device();
device.setId(deviceId);
device.setCustomerId(customerId);
AttributeKvCompositeKey compositeKey = new AttributeKvCompositeKey(
EntityType.TENANT, deviceId.getId(), "SERVER_SCOPE", "lessAttribute"
);
AttributeKvEntity attributeKvEntity = new AttributeKvEntity();
attributeKvEntity.setId(compositeKey);
attributeKvEntity.setLongValue(50L);
attributeKvEntity.setLastUpdateTs(0L);
AttributeKvEntry entry = attributeKvEntity.toData();
ListenableFuture<List<AttributeKvEntry>> listListenableFutureWithLess =
Futures.immediateFuture(Collections.singletonList(entry));
ListenableFuture<Optional<AttributeKvEntry>> optionalListenableFutureWithLess =
Futures.immediateFuture(Optional.of(entry));
KeyFilter lowTempFilter = new KeyFilter();
lowTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate lowTempPredicate = new NumericFilterPredicate();
lowTempPredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS);
lowTempPredicate.setValue(
new FilterPredicateValue<>(
32.0,
null,
new DynamicValue<>(DynamicValueSourceType.CURRENT_TENANT, "lessAttribute"))
);
lowTempFilter.setPredicate(lowTempPredicate);
AlarmCondition alarmCondition = new AlarmCondition();
alarmCondition.setCondition(Collections.singletonList(lowTempFilter));
AlarmRule alarmRule = new AlarmRule();
alarmRule.setCondition(alarmCondition);
DeviceProfileAlarm dpa = new DeviceProfileAlarm();
dpa.setId("lesstempID");
dpa.setAlarmType("lessTemperatureAlarm");
dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
deviceProfileData.setAlarms(Collections.singletonList(dpa));
deviceProfile.setProfileData(deviceProfileData);
Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
.thenReturn(Futures.immediateFuture(Collections.emptyList()));
Mockito.when(alarmService.findLatestByOriginatorAndType(tenantId, deviceId, "lessTemperatureAlarm"))
.thenReturn(Futures.immediateFuture(null));
Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any()))
.thenAnswer(AdditionalAnswers.returnsFirstArg());
Mockito.when(ctx.getAttributesService()).thenReturn(attributesService);
Mockito.when(attributesService.find(eq(tenantId), eq(deviceId), Mockito.anyString(), Mockito.anySet()))
.thenReturn(listListenableFutureWithLess);
Mockito.when(attributesService.find(eq(tenantId), eq(tenantId), eq(DataConstants.SERVER_SCOPE), Mockito.anyString()))
.thenReturn(optionalListenableFutureWithLess);
TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "");
Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString()))
.thenReturn(theMsg);
ObjectNode data = mapper.createObjectNode();
data.put("temperature", 40);
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
TbMsgDataType.JSON, mapper.writeValueAsString(data), null, null);
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
verify(ctx).tellNext(theMsg, "Alarm Created");
verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
}
private void init() throws TbNodeException {
UUID uuid = new UUID(6041557255264276971L, -9019477126543226049L);
System.out.println(uuid);
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
Mockito.when(ctx.getDeviceProfileCache()).thenReturn(cache);
Mockito.when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
Mockito.when(ctx.getAlarmService()).thenReturn(alarmService);
Mockito.when(ctx.getDeviceService()).thenReturn(deviceService);
Mockito.when(ctx.getAttributesService()).thenReturn(attributesService);
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.createObjectNode());
node = new TbDeviceProfileNode();
node.init(ctx, nodeConfiguration);

2
ui-ngx/src/app/modules/home/components/filter/filter-predicate-value.component.ts

@ -54,7 +54,7 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
if (allow) {
this.dynamicValueSourceTypes.push(DynamicValueSourceType.CURRENT_USER);
} else {
this.dynamicValueSourceTypes = [DynamicValueSourceType.CURRENT_DEVICE];
this.dynamicValueSourceTypes.push(DynamicValueSourceType.CURRENT_DEVICE);
}
}

4
ui-ngx/src/assets/locale/locale.constant-el_GR.json

@ -2056,9 +2056,7 @@
"name-required": "Απαιτείται Όνομα",
"configuration": "Διαμόρφωση",
"schedule": "Πρόγραμμα",
"start": "Έναρξη",
"date": "Ημερομηνία",
"time": "Ώρα",
"start-time": "Ώρα έναρξης",
"repeat": "Επανάληψη",
"repeats": "Επαναλήψεις",
"daily": "Καθημερινά",

4
ui-ngx/src/assets/locale/locale.constant-uk_UA.json

@ -1850,9 +1850,7 @@
"name-required": "Необхідно задати ім'я",
"configuration": "Конфігурація",
"schedule": "Розклад",
"start": "Початок",
"date": "Дата",
"time": "Час",
"start-time": "Початок",
"repeat": "Повтор",
"repeats": "Повтори",
"daily": "Щодня",

Loading…
Cancel
Save