* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,77 +17,127 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.event.LoggingAdapter;
-import com.fasterxml.jackson.core.JsonProcessingException;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.plugin.*;
+import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
-import org.thingsboard.server.common.data.id.PluginId;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
-import org.thingsboard.server.common.data.plugin.ComponentType;
-import org.thingsboard.server.common.data.plugin.PluginMetaData;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.extensions.api.plugins.Plugin;
-import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
-import org.thingsboard.server.extensions.api.rules.RuleException;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* @author Andrew Shvayka
*/
public class RuleChainActorMessageProcessor extends ComponentMsgProcessor {
- private ComponentLifecycleState state;
+ private final ActorRef parent;
+ private final ActorRef self;
+ private final Map nodeActors;
+ private final Map> nodeRoutes;
+ private final RuleChainService service;
+
+ private RuleNodeId firstId;
+ private RuleNodeCtx firstNode;
- protected RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId pluginId, ActorSystemContext systemContext
+ RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, LoggingAdapter logger, ActorRef parent, ActorRef self) {
- super(systemContext, logger, tenantId, pluginId);
+ super(systemContext, logger, tenantId, ruleChainId);
+ this.parent = parent;
+ this.self = self;
+ this.nodeActors = new HashMap<>();
+ this.nodeRoutes = new HashMap<>();
+ this.service = systemContext.getRuleChainService();
}
@Override
- public void start() throws Exception {
-
+ public void start(ActorContext context) throws Exception {
+ RuleChain ruleChain = service.findRuleChainById(entityId);
+ List ruleNodeList = service.getRuleChainNodes(entityId);
+ // Creating and starting the actors;
+ for (RuleNode ruleNode : ruleNodeList) {
+ String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
+ DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
+ ActorRef ruleNodeActor = context.actorOf(
+ Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
+ .withDispatcher(dispatcherName), ruleNode.toString());
+ nodeActors.put(ruleNode.getId(), new RuleNodeCtx(self, ruleNodeActor, ruleNode.getId()));
+ }
+ // Populating the routes map;
+ for (RuleNode ruleNode : ruleNodeList) {
+ List relations = service.getRuleNodeRelations(ruleNode.getId());
+ for (EntityRelation relation : relations) {
+ if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
+ RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
+ if (ruleNodeCtx == null) {
+ throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+ }
+ }
+ nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
+ .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
+ }
+ }
+
+ firstId = ruleChain.getFirstRuleNodeId();
+ firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
}
@Override
- public void stop() throws Exception {
-
+ public void stop(ActorContext context) throws Exception {
+ nodeActors.values().stream().map(RuleNodeCtx::getSelf).forEach(context::stop);
+ nodeActors.clear();
+ nodeRoutes.clear();
}
@Override
- public void onCreated(ActorContext context) throws Exception {
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
}
- @Override
- public void onUpdate(ActorContext context) throws Exception {
-
+ void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
+ TbMsg tbMsg = envelope.getTbMsg();
+ //TODO: push to queue and act on ack in async way
+ pushMstToNode(firstNode, tbMsg);
}
- @Override
- public void onActivate(ActorContext context) throws Exception {
-
+ void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
+ RuleNodeId originator = envelope.getOriginator();
+ String targetRelationType = envelope.getRelationType();
+ //TODO: log debug output
+ List relations = nodeRoutes.get(originator);
+ for (RuleNodeRelation relation : relations) {
+ if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
+ switch (relation.getOut().getEntityType()) {
+ case RULE_NODE:
+ RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
+ RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
+ pushMstToNode(targetRuleNode, envelope.getMsg());
+ break;
+ case RULE_CHAIN:
+// TODO: implement
+ break;
+ }
+ }
+ }
}
- @Override
- public void onSuspend(ActorContext context) throws Exception {
-
- }
-
- @Override
- public void onStop(ActorContext context) throws Exception {
-
+ private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
+ //TODO: log debug input
+ firstNode.getSelf().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
}
- @Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
-
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
index 2dca8ddb0d..cb76f04306 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
@@ -8,6 +8,7 @@ import org.thingsboard.server.actors.shared.rulechain.RuleChainManager;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.dao.rule.RuleChainService;
/**
* Created by ashvayka on 15.03.18.
@@ -16,11 +17,13 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
protected final RuleChainManager ruleChainManager;
protected final PluginManager pluginManager;
+ protected final RuleChainService ruleChainService;
public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager, PluginManager pluginManager) {
super(systemContext);
this.ruleChainManager = ruleChainManager;
this.pluginManager = pluginManager;
+ this.ruleChainService = systemContext.getRuleChainService();
}
protected void initRuleChains() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
new file mode 100644
index 0000000000..d6e82621ae
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -0,0 +1,23 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleChainToRuleNodeMsg implements TbActorMsg {
+
+ private final TbContext ctx;
+ private final TbMsg msg;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.RULE_CHAIN_TO_RULE_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
new file mode 100644
index 0000000000..cba19d17bc
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ComponentActor;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+public class RuleNodeActor extends ComponentActor {
+
+ private final RuleChainId ruleChainId;
+
+ private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
+ super(systemContext, tenantId, ruleNodeId);
+ this.ruleChainId = ruleChainId;
+ setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
+ logger, context().parent(), context().self()));
+ }
+
+ @Override
+ protected void process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case RULE_CHAIN_TO_RULE_MSG:
+ processor.onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
+ break;
+ }
+ }
+
+ public static class ActorCreator extends ContextBasedCreator {
+ private static final long serialVersionUID = 1L;
+
+ private final TenantId tenantId;
+ private final RuleChainId ruleChainId;
+ private final RuleNodeId ruleNodeId;
+
+ public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
+ super(context);
+ this.tenantId = tenantId;
+ this.ruleChainId = ruleChainId;
+ this.ruleNodeId = ruleNodeId;
+
+ }
+
+ @Override
+ public RuleNodeActor create() throws Exception {
+ return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId);
+ }
+ }
+
+ @Override
+ protected long getErrorPersistFrequency() {
+ return systemContext.getRuleNodeErrorPersistFrequency();
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
new file mode 100644
index 0000000000..99e48d858d
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.event.LoggingAdapter;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor {
+
+ private final ActorRef parent;
+ private final ActorRef self;
+ private final RuleChainService service;
+
+ RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
+ , LoggingAdapter logger, ActorRef parent, ActorRef self) {
+ super(systemContext, logger, tenantId, ruleNodeId);
+ this.parent = parent;
+ this.self = self;
+ this.service = systemContext.getRuleChainService();
+ }
+
+ @Override
+ public void start(ActorContext context) throws Exception {
+
+ }
+
+ @Override
+ public void stop(ActorContext context) throws Exception {
+ }
+
+ @Override
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+
+ }
+
+ void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
+
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
new file mode 100644
index 0000000000..dd25f8bc85
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
@@ -0,0 +1,15 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorRef;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleNodeCtx {
+ private final ActorRef chainActor;
+ private final ActorRef self;
+ private final RuleNodeId selfId;
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
new file mode 100644
index 0000000000..bd2d544eb3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
@@ -0,0 +1,17 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+
+@Data
+final class RuleNodeRelation {
+
+ private final EntityId in;
+ private final EntityId out;
+ private final String type;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
new file mode 100644
index 0000000000..95b362591d
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -0,0 +1,24 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
+
+ private final RuleNodeId originator;
+ private final String relationType;
+ private final TbMsg msg;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG;
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 0f66d256ed..0be0385365 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -17,6 +17,8 @@ package org.thingsboard.server.actors.service;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
import org.thingsboard.server.service.cluster.rpc.RpcMsgListener;
@@ -25,6 +27,8 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
+ void onMsg(ServiceToRuleEngineMsg msg);
+
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index 76b9be96b3..d0260dd974 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -54,7 +54,7 @@ public abstract class ComponentActor extends AbstractContextAwareMsgPr
this.entityId = id;
}
- public abstract void start() throws Exception;
+ public abstract void start(ActorContext context) throws Exception;
- public abstract void stop() throws Exception;
+ public abstract void stop(ActorContext context) throws Exception;
- public abstract void onCreated(ActorContext context) throws Exception;
+ public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
- public abstract void onUpdate(ActorContext context) throws Exception;
+ public void onCreated(ActorContext context) throws Exception {
+ start(context);
+ }
- public abstract void onActivate(ActorContext context) throws Exception;
+ public void onUpdate(ActorContext context) throws Exception {
+ restart(context);
+ }
- public abstract void onSuspend(ActorContext context) throws Exception;
+ public void onActivate(ActorContext context) throws Exception {
+ restart(context);
+ }
- public abstract void onStop(ActorContext context) throws Exception;
+ public void onSuspend(ActorContext context) throws Exception {
+ stop(context);
+ }
- public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
+ public void onStop(ActorContext context) throws Exception {
+ stop(context);
+ }
+
+ private void restart(ActorContext context) throws Exception {
+ stop(context);
+ start(context);
+ }
public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) {
schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
index a98609ceee..917a6452c4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -43,13 +43,16 @@ public abstract class EntityActorsManager(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) {
- log.debug("[{}] Creating plugin actor", entity.getId());
+ T entityId = (T) entity.getId();
+ log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId());
//TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
- getOrCreateActor(context, (T) entity.getId());
- log.debug("[{}] Plugin actor created.", entity.getId());
+ ActorRef actorRef = getOrCreateActor(context, entityId);
+ visit(entity, actorRef);
+ log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId());
}
}
+ protected void visit(M entity, ActorRef actorRef) {}
public ActorRef getOrCreateActor(ActorContext context, T entityId) {
return actors.computeIfAbsent(entityId, eId ->
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
index 09396110b4..97acb6cbfb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
@@ -1,6 +1,8 @@
package org.thingsboard.server.actors.shared.rulechain;
+import akka.actor.ActorRef;
import akka.japi.Creator;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ruleChain.RuleChainActor;
@@ -16,6 +18,10 @@ import org.thingsboard.server.dao.rule.RuleChainService;
public abstract class RuleChainManager extends EntityActorsManager {
protected final RuleChainService service;
+ @Getter
+ protected RuleChain rootChain;
+ @Getter
+ protected ActorRef rootChainActor;
public RuleChainManager(ActorSystemContext systemContext) {
super(systemContext);
@@ -27,4 +33,12 @@ public abstract class RuleChainManager extends EntityActorsManager deviceActors;
@@ -62,25 +62,42 @@ public class TenantActor extends RuleChainManagerActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("[{}] Received message: {}", tenantId, msg);
- if (msg instanceof ToDeviceActorMsg) {
- onToDeviceActorMsg((ToDeviceActorMsg) msg);
- } else if (msg instanceof ToPluginActorMsg) {
- onToPluginMsg((ToPluginActorMsg) msg);
- } else if (msg instanceof ToDeviceActorNotificationMsg) {
- onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
- } else if (msg instanceof ClusterEventMsg) {
- broadcast(msg);
- } else if (msg instanceof ComponentLifecycleMsg) {
- onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
- } else if (msg instanceof PluginTerminationMsg) {
- onPluginTerminated((PluginTerminationMsg) msg);
- } else {
- logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+ protected void process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
}
}
+ private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+ ruleChainManager.getRootChainActor().tell(msg, self());
+ }
+
+
+// @Override
+// public void onReceive(Object msg) throws Exception {
+// logger.debug("[{}] Received message: {}", tenantId, msg);
+// if (msg instanceof ToDeviceActorMsg) {
+// onToDeviceActorMsg((ToDeviceActorMsg) msg);
+// } else if (msg instanceof ToPluginActorMsg) {
+// onToPluginMsg((ToPluginActorMsg) msg);
+// } else if (msg instanceof ToDeviceActorNotificationMsg) {
+// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+// } else if (msg instanceof ClusterEventMsg) {
+// broadcast(msg);
+// } else if (msg instanceof ComponentLifecycleMsg) {
+// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+// } else if (msg instanceof PluginTerminationMsg) {
+// onPluginTerminated((PluginTerminationMsg) msg);
+// } else {
+// logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+// }
+// }
+
private void broadcast(Object msg) {
pluginManager.broadcast(msg);
deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 1c842c8a1b..778c2a3ffb 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -215,6 +215,12 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
+ chain:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
+ node:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index a776d7b6ce..5624ed801f 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -37,6 +37,8 @@ public class DataConstants {
public static final String ERROR = "ERROR";
public static final String LC_EVENT = "LC_EVENT";
public static final String STATS = "STATS";
+ public static final String RULE_CHAIN_DEBUG = "DEBUG_RULE_CHAIN";
+ public static final String RULE_NODE_DEBUG = "DEBUG_RULE_NODE";
public static final String ONEWAY = "ONEWAY";
public static final String TWOWAY = "TWOWAY";
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
index 45fb590ed6..ab6accaf11 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
@@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
*/
public enum ComponentType {
- FILTER, PROCESSOR, ACTION, PLUGIN
+ ENRICHMENT, FILTER, PROCESSOR, TRANSFORMATION, ACTION, PLUGIN
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 55c131d12d..22fc4285a9 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -4,4 +4,34 @@ package org.thingsboard.server.common.msg;
* Created by ashvayka on 15.03.18.
*/
public enum MsgType {
+
+ /**
+ * ADDED/UPDATED/DELETED events for main entities.
+ *
+ * @See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
+ */
+ COMPONENT_LIFE_CYCLE_MSG,
+
+ /**
+ * Misc messages from the REST API/SERVICE layer to the new rule engine.
+ *
+ * @See {@link org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg}
+ */
+ SERVICE_TO_RULE_ENGINE_MSG,
+
+
+ SESSION_TO_DEVICE_ACTOR_MSG,
+ DEVICE_ACTOR_TO_SESSION_MSG,
+
+
+ /**
+ * Message that is sent by RuleChainActor to RuleActor with command to process TbMsg.
+ */
+ RULE_CHAIN_TO_RULE_MSG,
+
+ /**
+ * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
+ */
+ RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 5163b6c39d..c689b24fe8 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,7 +36,7 @@ public final class TbMsg implements Serializable {
private final String type;
private final EntityId originator;
private final TbMsgMetaData metaData;
-
+ private final TbMsgDataType dataType;
private final byte[] data;
public static ByteBuffer toBytes(TbMsg msg) {
@@ -49,11 +49,10 @@ public final class TbMsg implements Serializable {
}
if (msg.getMetaData() != null) {
- MsgProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
- metadataBuilder.putAllData(msg.getMetaData().getData());
- builder.addMetaData(metadataBuilder.build());
+ builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
}
+ builder.setDataType(msg.getDataType().ordinal());
builder.setData(ByteString.copyFrom(msg.getData()));
byte[] bytes = builder.build().toByteArray();
return ByteBuffer.wrap(bytes);
@@ -63,16 +62,11 @@ public final class TbMsg implements Serializable {
try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
TbMsgMetaData metaData = new TbMsgMetaData();
- if (proto.getMetaDataCount() > 0) {
- metaData.setData(proto.getMetaData(0).getDataMap());
- }
-
- EntityId entityId = null;
- if (proto.getEntityId() != null) {
- entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
- }
+ metaData.setData(proto.getMetaData().getDataMap());
- return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
+ EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+ TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
+ return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
new file mode 100644
index 0000000000..b6e2d5a5c5
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
@@ -0,0 +1,11 @@
+package org.thingsboard.server.common.msg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+public enum TbMsgDataType {
+
+ // Do not change ordering. We use ordinal to save some bytes on serialization
+ JSON, TEXT, BINARY;
+
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index b77722e826..315eb86f6c 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@@ -30,7 +32,7 @@ import java.util.Optional;
* @author Andrew Shvayka
*/
@ToString
-public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
+public class ComponentLifecycleMsg implements TbActorMsg, TenantAwareMsg, ToAllNodesMsg {
@Getter
private final TenantId tenantId;
@Getter
@@ -56,4 +58,8 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
return entityId.getEntityType() == EntityType.RULE_CHAIN ? Optional.of((RuleChainId) entityId) : Optional.empty();
}
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.COMPONENT_LIFE_CYCLE_MSG;
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
new file mode 100644
index 0000000000..3d82033b31
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
@@ -0,0 +1,22 @@
+package org.thingsboard.server.common.msg.system;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+@Data
+public final class ServiceToRuleEngineMsg implements TbActorMsg {
+
+ private final TenantId tenantId;
+ private final TbMsg tbMsg;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SERVICE_TO_RULE_ENGINE_MSG;
+ }
+}
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 90fa2bdbae..62acff20ce 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -19,6 +19,9 @@ package msgqueue;
option java_package = "org.thingsboard.server.common.msg.gen";
option java_outer_classname = "MsgProtos";
+message TbMsgMetaDataProto {
+ map data = 1;
+}
message TbMsgProto {
string id = 1;
@@ -26,11 +29,8 @@ message TbMsgProto {
string entityType = 3;
string entityId = 4;
- message TbMsgMetaDataProto {
- map data = 1;
- }
+ TbMsgMetaDataProto metaData = 5;
- repeated TbMsgMetaDataProto metaData = 5;
-
- bytes data = 6;
+ int32 dataType = 6;
+ bytes data = 7;
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
index f1df09edb9..fff3f6d907 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.dao.rule;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -67,67 +66,7 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
@Override
public RuleMetaData saveRule(RuleMetaData rule) {
- ruleValidator.validate(rule);
- if (rule.getTenantId() == null) {
- log.trace("Save system rule metadata with predefined id {}", systemTenantId);
- rule.setTenantId(systemTenantId);
- }
- if (rule.getId() != null) {
- RuleMetaData oldVersion = ruleDao.findById(rule.getId());
- if (rule.getState() == null) {
- rule.setState(oldVersion.getState());
- } else if (rule.getState() != oldVersion.getState()) {
- throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
- }
- } else {
- if (rule.getState() == null) {
- rule.setState(ComponentLifecycleState.SUSPENDED);
- } else if (rule.getState() != ComponentLifecycleState.SUSPENDED) {
- throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
- }
- }
-
- validateFilters(rule.getFilters());
- if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
- validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
- }
- if (rule.getAction() != null && !rule.getAction().isNull()) {
- validateComponentJson(rule.getAction(), ComponentType.ACTION);
- }
- validateRuleAndPluginState(rule);
- return ruleDao.save(rule);
- }
-
- private void validateFilters(JsonNode filtersJson) {
- if (filtersJson == null || filtersJson.isNull()) {
- throw new IncorrectParameterException("Rule filters are required!");
- }
- if (!filtersJson.isArray()) {
- throw new IncorrectParameterException("Filters json is not an array!");
- }
- ArrayNode filtersArray = (ArrayNode) filtersJson;
- for (int i = 0; i < filtersArray.size(); i++) {
- validateComponentJson(filtersArray.get(i), ComponentType.FILTER);
- }
- }
-
- private void validateComponentJson(JsonNode json, ComponentType type) {
- if (json == null || json.isNull()) {
- throw new IncorrectParameterException(type.name() + " is required!");
- }
- String clazz = getIfValid(type.name(), json, "clazz", JsonNode::isTextual, JsonNode::asText);
- String name = getIfValid(type.name(), json, "name", JsonNode::isTextual, JsonNode::asText);
- JsonNode configuration = getIfValid(type.name(), json, "configuration", JsonNode::isObject, node -> node);
- ComponentDescriptor descriptor = componentDescriptorService.findByClazz(clazz);
- if (descriptor == null) {
- throw new IncorrectParameterException(type.name() + " clazz " + clazz + " is not a valid component!");
- }
- if (descriptor.getType() != type) {
- throw new IncorrectParameterException("Clazz " + clazz + " is not a valid " + type.name() + " component!");
- }
- if (!componentDescriptorService.validate(descriptor, configuration)) {
- throw new IncorrectParameterException(type.name() + " configuration is not valid!");
- }
+ throw new RuntimeException("Not supported since v1.5!");
}
private void validateRuleAndPluginState(RuleMetaData rule) {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 07cd72c001..a1c85e2bfb 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -38,7 +38,7 @@ public interface TbContext {
void spawn(TbMsg msg);
- void ack(UUID msg);
+ void ack(TbMsg msg);
AttributesService getAttributesService();
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
index 9edd14c953..d1a4adc6b6 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
@@ -44,6 +44,7 @@ public class TbTransformNode implements TbNode {
try {
//TODO: refactor this to work async and fetch attributes from cache.
AttributesService service = ctx.getAttributesService();
+
fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");