Browse Source

Memory leak fix for cases when actors fail to initialize or stopped. Cleanup TbMsgPackProcessingContext as well.

pull/4088/head
Andrii Shvaika 5 years ago
parent
commit
cfb8a72f06
  1. 4
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  2. 23
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 13
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
  4. 30
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
  5. 18
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
  6. 7
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
  7. 2
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
  8. 34
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
  9. 17
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfMsg.java
  10. 42
      application/src/main/java/org/thingsboard/server/actors/ruleChain/TbToRuleNodeActorMsg.java
  11. 8
      application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java
  12. 2
      application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistTick.java
  13. 4
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  14. 5
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
  15. 6
      application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java
  16. 2
      application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
  17. 2
      application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java
  18. 2
      common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java
  19. 2
      common/actor/src/main/java/org/thingsboard/server/actors/TbActorException.java
  20. 20
      common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java
  21. 5
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  22. 8
      common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java
  23. 22
      common/message/src/main/java/org/thingsboard/server/common/msg/TbActorStopReason.java
  24. 25
      common/message/src/main/java/org/thingsboard/server/common/msg/TbRuleEngineActorMsg.java
  25. 38
      common/message/src/main/java/org/thingsboard/server/common/msg/queue/QueueToRuleEngineMsg.java
  26. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeException.java
  27. 5
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  28. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceCredentialsUpdateNotificationMsg.java
  29. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java
  30. 4
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

4
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -134,12 +134,12 @@ public class AppActor extends ContextAwareActor {
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
} else {
msg.getTbMsg().getCallback().onSuccess();
msg.getMsg().getCallback().onSuccess();
}
}
}

23
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
@ -34,7 +33,6 @@ import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
@ -90,10 +88,12 @@ class DefaultTbContext implements TbContext {
public final static ObjectMapper mapper = new ObjectMapper();
private final ActorSystemContext mainCtx;
private final String ruleChainName;
private final RuleNodeCtx nodeCtx;
public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) {
public DefaultTbContext(ActorSystemContext mainCtx, String ruleChainName, RuleNodeCtx nodeCtx) {
this.mainCtx = mainCtx;
this.ruleChainName = ruleChainName;
this.nodeCtx = nodeCtx;
}
@ -117,13 +117,13 @@ class DefaultTbContext implements TbContext {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
@Override
public void tellSelf(TbMsg msg, long delayMs) {
//TODO: add persistence layer
scheduleMsgWithDelay(new RuleNodeToSelfMsg(msg), delayMs, nodeCtx.getSelfActor());
scheduleMsgWithDelay(new RuleNodeToSelfMsg(this, msg), delayMs, nodeCtx.getSelfActor());
}
@Override
@ -254,7 +254,8 @@ class DefaultTbContext implements TbContext {
} else {
failureMessage = null;
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(),
nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
msg, failureMessage));
}
@ -301,6 +302,16 @@ class DefaultTbContext implements TbContext {
return nodeCtx.getSelf().getId();
}
@Override
public RuleNode getSelf() {
return nodeCtx.getSelf();
}
@Override
public String getRuleChainName() {
return ruleChainName;
}
@Override
public TenantId getTenantId() {
return nodeCtx.getTenantId();

13
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java

@ -23,7 +23,6 @@ import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -197,11 +196,11 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getTbMsg();
TbMsg msg = envelope.getMsg();
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
try {
checkActive(envelope.getTbMsg());
checkActive(envelope.getMsg());
RuleNodeId targetId = msg.getRuleNodeId();
RuleNodeCtx targetCtx;
if (targetId == null) {
@ -218,12 +217,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
msg.getCallback().onSuccess();
}
} catch (RuleNodeException rne) {
envelope.getTbMsg().getCallback().onFailure(rne);
envelope.getMsg().getCallback().onFailure(rne);
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
envelope.getMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
}
} else {
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
onTellNext(envelope.getMsg(), envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
@ -335,7 +334,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
if (nodeCtx != null) {
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType));
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
} else {
log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));

30
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java

@ -15,24 +15,44 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
@EqualsAndHashCode(callSuper = true)
@ToString
public final class RuleChainToRuleChainMsg extends TbRuleEngineActorMsg implements RuleChainAwareMsg {
@Getter
private final RuleChainId target;
@Getter
private final RuleChainId source;
private final TbMsg msg;
@Getter
private final String fromRelationType;
public RuleChainToRuleChainMsg(RuleChainId target, RuleChainId source, TbMsg tbMsg, String fromRelationType) {
super(tbMsg);
this.target = target;
this.source = source;
this.fromRelationType = fromRelationType;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", target.getId()) : String.format("Failed to initialize rule chain [%s]!", target.getId());
msg.getCallback().onFailure(new RuleEngineException(message));
}
@Override
public RuleChainId getRuleChainId() {
return target;

18
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java

@ -15,22 +15,28 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleChainToRuleNodeMsg implements TbActorMsg {
@EqualsAndHashCode(callSuper = true)
@ToString
final class RuleChainToRuleNodeMsg extends TbToRuleNodeActorMsg {
private final TbContext ctx;
private final TbMsg msg;
@Getter
private final String fromRelationType;
public RuleChainToRuleNodeMsg(TbContext ctx, TbMsg tbMsg, String fromRelationType) {
super(ctx, tbMsg);
this.fromRelationType = fromRelationType;
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_TO_RULE_MSG;

7
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java

@ -59,9 +59,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
case RULE_CHAIN_TO_RULE_MSG:
onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
break;
case RULE_TO_SELF_ERROR_MSG:
onRuleNodeToSelfErrorMsg((RuleNodeToSelfErrorMsg) msg);
break;
case RULE_TO_SELF_MSG:
onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);
break;
@ -101,10 +98,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
}
private void onRuleNodeToSelfErrorMsg(RuleNodeToSelfErrorMsg msg) {
logAndPersist("onRuleMsg", ActorSystemContext.toException(msg.getError()));
}
public static class ActorCreator extends ContextBasedCreator {
private final TenantId tenantId;

2
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java

@ -54,7 +54,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
this.ruleChainName = ruleChainName;
this.self = self;
this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
this.defaultCtx = new DefaultTbContext(systemContext, new RuleNodeCtx(tenantId, parent, self, ruleNode));
this.defaultCtx = new DefaultTbContext(systemContext, ruleChainName, new RuleNodeCtx(tenantId, parent, self, ruleNode));
this.info = new RuleNodeInfo(ruleNodeId, ruleChainName, ruleNode != null ? ruleNode.getName() : "Unknown");
}

34
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java

@ -15,11 +15,16 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import java.io.Serializable;
import java.util.Set;
@ -27,15 +32,34 @@ import java.util.Set;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
class RuleNodeToRuleChainTellNextMsg implements TbActorMsg, Serializable {
@EqualsAndHashCode(callSuper = true)
@ToString
class RuleNodeToRuleChainTellNextMsg extends TbRuleEngineActorMsg implements Serializable {
private static final long serialVersionUID = 4577026446412871820L;
@Getter
private final RuleChainId ruleChainId;
@Getter
private final RuleNodeId originator;
@Getter
private final Set<String> relationTypes;
private final TbMsg msg;
@Getter
private final String failureMessage;
public RuleNodeToRuleChainTellNextMsg(RuleChainId ruleChainId, RuleNodeId originator, Set<String> relationTypes, TbMsg tbMsg, String failureMessage) {
super(tbMsg);
this.ruleChainId = ruleChainId;
this.originator = originator;
this.relationTypes = relationTypes;
this.failureMessage = failureMessage;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", ruleChainId.getId()) : String.format("Failed to initialize rule chain [%s]!", ruleChainId.getId());
msg.getCallback().onFailure(new RuleEngineException(message));
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG;

17
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfMsg.java

@ -15,18 +15,25 @@
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleNodeToSelfMsg implements TbActorMsg {
@EqualsAndHashCode(callSuper = true)
@ToString
final class RuleNodeToSelfMsg extends TbToRuleNodeActorMsg {
private final TbMsg msg;
public RuleNodeToSelfMsg(TbContext ctx, TbMsg tbMsg) {
super(ctx, tbMsg);
}
@Override
public MsgType getMsgType() {

42
application/src/main/java/org/thingsboard/server/actors/ruleChain/TbToRuleNodeActorMsg.java

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
@EqualsAndHashCode(callSuper = true)
public abstract class TbToRuleNodeActorMsg extends TbRuleEngineActorMsg {
@Getter
private final TbContext ctx;
public TbToRuleNodeActorMsg(TbContext ctx, TbMsg tbMsg) {
super(tbMsg);
this.ctx = ctx;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message = reason == TbActorStopReason.STOPPED ? "Rule node stopped" : "Failed to initialize rule node!";
msg.getCallback().onFailure(new RuleNodeException(message, ctx.getRuleChainName(), ctx.getSelf()));
}
}

8
application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java

@ -28,10 +28,10 @@ import org.thingsboard.server.common.msg.TbActorMsg;
@ToString
public final class StatsPersistMsg implements TbActorMsg {
private long messagesProcessed;
private long errorsOccurred;
private TenantId tenantId;
private EntityId entityId;
private final long messagesProcessed;
private final long errorsOccurred;
private final TenantId tenantId;
private final EntityId entityId;
@Override
public MsgType getMsgType() {

2
application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistTick.java

@ -18,7 +18,7 @@ package org.thingsboard.server.actors.stats;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
public final class StatsPersistTick implements TbActorMsg{
public final class StatsPersistTick implements TbActorMsg {
@Override
public MsgType getMsgType() {
return MsgType.STATS_PERSIST_TICK_MSG;

4
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -119,7 +119,7 @@ public class TenantActor extends RuleChainManagerActor {
log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
queueMsg.getTbMsg().getCallback().onSuccess();
queueMsg.getMsg().getCallback().onSuccess();
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
transportMsg.getCallback().onSuccess();
@ -177,7 +177,7 @@ public class TenantActor extends RuleChainManagerActor {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getTbMsg();
TbMsg tbMsg = msg.getMsg();
if (apiUsageState.isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {

5
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

@ -181,7 +181,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
new TbMsgPackCallback(id, tenantId, ctx);
try {
if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
if (!toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
@ -209,6 +209,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (statsEnabled) {
stats.log(result, decision.isCommit());
}
ctx.cleanup();
if (decision.isCommit()) {
submitStrategy.stop();
break;

6
application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java

@ -147,4 +147,10 @@ public class TbMsgPackProcessingContext {
.forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel()));
}
}
public void cleanup() {
pendingMap.clear();
successMap.clear();
failedMap.clear();
}
}

2
application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java

@ -31,6 +31,8 @@ import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
@RequiredArgsConstructor
public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg {
private static final long serialVersionUID = -8592877558138716589L;
@Getter
private final String serviceId;
@Getter

2
application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java

@ -34,6 +34,8 @@ import java.util.UUID;
@Data
public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable {
private static final long serialVersionUID = 7191333353202935941L;
private final TenantId tenantId;
private final DeviceId deviceId;
private final TransportToDeviceActorMsg msg;

2
common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java

@ -30,7 +30,7 @@ public interface TbActor {
}
default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
return InitFailureStrategy.retryWithDelay(5000 * attempt);
return InitFailureStrategy.retryWithDelay(5000L * attempt);
}
default ProcessFailureStrategy onProcessFailure(Throwable t) {

2
common/actor/src/main/java/org/thingsboard/server/actors/TbActorException.java

@ -17,6 +17,8 @@ package org.thingsboard.server.actors;
public class TbActorException extends Exception {
private static final long serialVersionUID = 8209771144711980882L;
public TbActorException(String message, Throwable cause) {
super(message, cause);
}

20
common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java

@ -18,6 +18,7 @@ package org.thingsboard.server.actors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -49,6 +50,7 @@ public final class TbActorMailbox implements TbActorCtx {
private final AtomicBoolean busy = new AtomicBoolean(FREE);
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
private final AtomicBoolean destroyInProgress = new AtomicBoolean();
private volatile TbActorStopReason stopReason;
public void initActor() {
dispatcher.getExecutor().execute(() -> tryInit(1));
@ -70,6 +72,7 @@ public final class TbActorMailbox implements TbActorCtx {
InitFailureStrategy strategy = actor.onInitFailure(attempt, t);
if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
stopReason = TbActorStopReason.INIT_FAILED;
system.stop(selfId);
} else if (strategy.getRetryDelay() > 0) {
log.info("[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", selfId, attempt, strategy.getRetryDelay());
@ -84,12 +87,16 @@ public final class TbActorMailbox implements TbActorCtx {
}
private void enqueue(TbActorMsg msg, boolean highPriority) {
if (highPriority) {
highPriorityMsgs.add(msg);
if (!destroyInProgress.get()) {
if (highPriority) {
highPriorityMsgs.add(msg);
} else {
normalPriorityMsgs.add(msg);
}
tryProcessQueue(true);
} else {
normalPriorityMsgs.add(msg);
msg.onTbActorStopped(stopReason);
}
tryProcessQueue(true);
}
private void tryProcessQueue(boolean newMsg) {
@ -180,11 +187,16 @@ public final class TbActorMailbox implements TbActorCtx {
}
public void destroy() {
if (stopReason == null) {
stopReason = TbActorStopReason.STOPPED;
}
destroyInProgress.set(true);
dispatcher.getExecutor().execute(() -> {
try {
ready.set(NOT_READY);
actor.destroy();
highPriorityMsgs.forEach(msg -> msg.onTbActorStopped(stopReason));
normalPriorityMsgs.forEach(msg -> msg.onTbActorStopped(stopReason));
} catch (Throwable t) {
log.warn("[{}] Failed to destroy actor: {}", selfId, t);
}

5
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -66,11 +66,6 @@ public enum MsgType {
*/
REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
/**
* Message that is sent by RuleActor implementation to RuleActor itself to log the error.
*/
RULE_TO_SELF_ERROR_MSG,
/**
* Message that is sent by RuleActor implementation to RuleActor itself to process the message.
*/

8
common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java

@ -22,4 +22,12 @@ public interface TbActorMsg {
MsgType getMsgType();
/**
* Executed when the target TbActor is stopped or destroyed.
* For example, rule node failed to initialize or removed from rule chain.
* Implementation should cleanup the resources.
*/
default void onTbActorStopped(TbActorStopReason reason) {
}
}

22
common/message/src/main/java/org/thingsboard/server/common/msg/TbActorStopReason.java

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
public enum TbActorStopReason {
INIT_FAILED, STOPPED
}

25
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java → common/message/src/main/java/org/thingsboard/server/common/msg/TbRuleEngineActorMsg.java

@ -13,25 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
package org.thingsboard.server.common.msg;
import lombok.Data;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import lombok.EqualsAndHashCode;
import lombok.Getter;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleNodeToSelfErrorMsg implements TbActorMsg {
@EqualsAndHashCode
public abstract class TbRuleEngineActorMsg implements TbActorMsg {
private final TbMsg msg;
private final Throwable error;
@Getter
protected final TbMsg msg;
@Override
public MsgType getMsgType() {
return MsgType.RULE_TO_SELF_ERROR_MSG;
public TbRuleEngineActorMsg(TbMsg msg) {
this.msg = msg;
}
}

38
common/message/src/main/java/org/thingsboard/server/common/msg/queue/QueueToRuleEngineMsg.java

@ -15,32 +15,58 @@
*/
package org.thingsboard.server.common.msg.queue;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import java.io.Serializable;
import java.util.Set;
/**
* Created by ashvayka on 15.03.18.
*/
@Data
public final class QueueToRuleEngineMsg implements TbActorMsg {
@ToString
@EqualsAndHashCode(callSuper = true)
public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg {
@Getter
private final TenantId tenantId;
private final TbMsg tbMsg;
@Getter
private final Set<String> relationTypes;
@Getter
private final String failureMessage;
public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) {
super(tbMsg);
this.tenantId = tenantId;
this.relationTypes = relationTypes;
this.failureMessage = failureMessage;
}
@Override
public MsgType getMsgType() {
return MsgType.QUEUE_TO_RULE_ENGINE_MSG;
}
@Override
public void onTbActorStopped(TbActorStopReason reason) {
String message;
if (msg.getRuleChainId() != null) {
message = reason == TbActorStopReason.STOPPED ?
String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) :
String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId());
} else {
message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!";
}
msg.getCallback().onFailure(new RuleEngineException(message));
}
public boolean isTellNext() {
return relationTypes != null && !relationTypes.isEmpty();
}
}

4
common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeException.java

@ -24,6 +24,9 @@ import org.thingsboard.server.common.data.rule.RuleNode;
@Slf4j
public class RuleNodeException extends RuleEngineException {
private static final long serialVersionUID = -1776681087370749776L;
@Getter
private final String ruleChainName;
@Getter
@ -33,6 +36,7 @@ public class RuleNodeException extends RuleEngineException {
@Getter
private final RuleNodeId ruleNodeId;
public RuleNodeException(String message, String ruleChainName, RuleNode ruleNode) {
super(message);
this.ruleChainName = ruleChainName;

5
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -161,6 +162,10 @@ public interface TbContext {
RuleNodeId getSelfId();
RuleNode getSelf();
String getRuleChainName();
TenantId getTenantId();
AttributesService getAttributesService();

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceCredentialsUpdateNotificationMsg.java

@ -31,6 +31,8 @@ import java.util.Set;
@Data
public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg {
private static final long serialVersionUID = -3956907402411126990L;
private final TenantId tenantId;
private final DeviceId deviceId;

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java

@ -24,6 +24,9 @@ import org.thingsboard.server.common.msg.MsgType;
@Data
@AllArgsConstructor
public class DeviceNameOrTypeUpdateMsg implements ToDeviceActorNotificationMsg {
private static final long serialVersionUID = -5738949227650536685L;
private final TenantId tenantId;
private final DeviceId deviceId;
private final String deviceName;

4
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

@ -435,6 +435,10 @@ public class TbDeviceProfileNodeTest {
}
private void init() throws TbNodeException {
UUID uuid = new UUID(6041557255264276971L, -9019477126543226049L);
System.out.println(uuid);
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
Mockito.when(ctx.getDeviceProfileCache()).thenReturn(cache);
Mockito.when(ctx.getTimeseriesService()).thenReturn(timeseriesService);

Loading…
Cancel
Save