committed by
GitHub
112 changed files with 2384 additions and 1839 deletions
@ -1,117 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import akka.actor.ActorRef; |
|||
import org.thingsboard.server.common.msg.core.RuleEngineError; |
|||
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg; |
|||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; |
|||
import org.thingsboard.server.common.msg.session.ToDeviceMsg; |
|||
import org.thingsboard.server.extensions.api.device.DeviceAttributes; |
|||
import org.thingsboard.server.extensions.api.device.DeviceMetaData; |
|||
|
|||
public class ChainProcessingContext { |
|||
|
|||
private final ChainProcessingMetaData md; |
|||
private final int index; |
|||
private final RuleEngineError error; |
|||
private ToDeviceMsg response; |
|||
|
|||
|
|||
public ChainProcessingContext(ChainProcessingMetaData md) { |
|||
super(); |
|||
this.md = md; |
|||
this.index = 0; |
|||
this.error = RuleEngineError.NO_RULES; |
|||
} |
|||
|
|||
private ChainProcessingContext(ChainProcessingContext other, int indexOffset, RuleEngineError error) { |
|||
super(); |
|||
this.md = other.md; |
|||
this.index = other.index + indexOffset; |
|||
this.error = error; |
|||
this.response = other.response; |
|||
|
|||
if (this.index < 0 || this.index >= this.md.chain.size()) { |
|||
throw new IllegalArgumentException("Can't apply offset " + indexOffset + " to the chain!"); |
|||
} |
|||
} |
|||
|
|||
public ActorRef getDeviceActor() { |
|||
return md.originator; |
|||
} |
|||
|
|||
public ActorRef getCurrentActor() { |
|||
return md.chain.getRuleActorMd(index).getActorRef(); |
|||
} |
|||
|
|||
public boolean hasNext() { |
|||
return (getChainLength() - 1) > index; |
|||
} |
|||
|
|||
public boolean isFailure() { |
|||
return (error != null && error.isCritical()) || (response != null && !response.isSuccess()); |
|||
} |
|||
|
|||
public ChainProcessingContext getNext() { |
|||
return new ChainProcessingContext(this, 1, this.error); |
|||
} |
|||
|
|||
public ChainProcessingContext withError(RuleEngineError error) { |
|||
if (error != null && (this.error == null || this.error.getPriority() < error.getPriority())) { |
|||
return new ChainProcessingContext(this, 0, error); |
|||
} else { |
|||
return this; |
|||
} |
|||
} |
|||
|
|||
public int getChainLength() { |
|||
return md.chain.size(); |
|||
} |
|||
|
|||
public ToDeviceActorMsg getInMsg() { |
|||
return md.inMsg; |
|||
} |
|||
|
|||
public DeviceMetaData getDeviceMetaData() { |
|||
return md.deviceMetaData; |
|||
} |
|||
|
|||
public String getDeviceName() { |
|||
return md.deviceMetaData.getDeviceName(); |
|||
} |
|||
|
|||
public String getDeviceType() { |
|||
return md.deviceMetaData.getDeviceType(); |
|||
} |
|||
|
|||
public DeviceAttributes getAttributes() { |
|||
return md.deviceMetaData.getDeviceAttributes(); |
|||
} |
|||
|
|||
public ToDeviceMsg getResponse() { |
|||
return response; |
|||
} |
|||
|
|||
public void mergeResponse(ToDeviceMsg response) { |
|||
// TODO add merge logic
|
|||
this.response = response; |
|||
} |
|||
|
|||
public RuleEngineErrorMsg getError() { |
|||
return new RuleEngineErrorMsg(md.inMsg.getPayload().getMsgType(), error); |
|||
} |
|||
} |
|||
@ -1,41 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import akka.actor.ActorRef; |
|||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; |
|||
import org.thingsboard.server.extensions.api.device.DeviceMetaData; |
|||
|
|||
/** |
|||
* Immutable part of chain processing data; |
|||
* |
|||
* @author ashvayka |
|||
*/ |
|||
public final class ChainProcessingMetaData { |
|||
|
|||
final RuleActorChain chain; |
|||
final ToDeviceActorMsg inMsg; |
|||
final ActorRef originator; |
|||
final DeviceMetaData deviceMetaData; |
|||
|
|||
public ChainProcessingMetaData(RuleActorChain chain, ToDeviceActorMsg inMsg, DeviceMetaData deviceMetaData, ActorRef originator) { |
|||
super(); |
|||
this.chain = chain; |
|||
this.inMsg = inMsg; |
|||
this.originator = originator; |
|||
this.deviceMetaData = deviceMetaData; |
|||
} |
|||
} |
|||
@ -1,43 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
public class ComplexRuleActorChain implements RuleActorChain { |
|||
|
|||
private final RuleActorChain systemChain; |
|||
private final RuleActorChain tenantChain; |
|||
|
|||
public ComplexRuleActorChain(RuleActorChain systemChain, RuleActorChain tenantChain) { |
|||
super(); |
|||
this.systemChain = systemChain; |
|||
this.tenantChain = tenantChain; |
|||
} |
|||
|
|||
@Override |
|||
public int size() { |
|||
return systemChain.size() + tenantChain.size(); |
|||
} |
|||
|
|||
@Override |
|||
public RuleActorMetaData getRuleActorMd(int index) { |
|||
if (index < systemChain.size()) { |
|||
return systemChain.getRuleActorMd(index); |
|||
} else { |
|||
return tenantChain.getRuleActorMd(index - systemChain.size()); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,90 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.ComponentActor; |
|||
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|||
import org.thingsboard.server.actors.stats.StatsPersistTick; |
|||
import org.thingsboard.server.common.data.id.RuleId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg; |
|||
|
|||
public class RuleActor extends ComponentActor<RuleId, RuleActorMessageProcessor> { |
|||
|
|||
private RuleActor(ActorSystemContext systemContext, TenantId tenantId, RuleId ruleId) { |
|||
super(systemContext, tenantId, ruleId); |
|||
setProcessor(new RuleActorMessageProcessor(tenantId, ruleId, systemContext, logger)); |
|||
} |
|||
|
|||
@Override |
|||
public void onReceive(Object msg) throws Exception { |
|||
logger.debug("[{}] Received message: {}", id, msg); |
|||
if (msg instanceof RuleProcessingMsg) { |
|||
try { |
|||
processor.onRuleProcessingMsg(context(), (RuleProcessingMsg) msg); |
|||
increaseMessagesProcessedCount(); |
|||
} catch (Exception e) { |
|||
logAndPersist("onDeviceMsg", e); |
|||
} |
|||
} else if (msg instanceof PluginToRuleMsg<?>) { |
|||
try { |
|||
processor.onPluginMsg(context(), (PluginToRuleMsg<?>) msg); |
|||
} catch (Exception e) { |
|||
logAndPersist("onPluginMsg", e); |
|||
} |
|||
} else if (msg instanceof ComponentLifecycleMsg) { |
|||
onComponentLifecycleMsg((ComponentLifecycleMsg) msg); |
|||
} else if (msg instanceof ClusterEventMsg) { |
|||
onClusterEventMsg((ClusterEventMsg) msg); |
|||
} else if (msg instanceof RuleToPluginTimeoutMsg) { |
|||
try { |
|||
processor.onTimeoutMsg(context(), (RuleToPluginTimeoutMsg) msg); |
|||
} catch (Exception e) { |
|||
logAndPersist("onTimeoutMsg", e); |
|||
} |
|||
} else if (msg instanceof StatsPersistTick) { |
|||
onStatsPersistTick(id); |
|||
} else { |
|||
logger.debug("[{}][{}] Unknown msg type.", tenantId, id, msg.getClass().getName()); |
|||
} |
|||
} |
|||
|
|||
public static class ActorCreator extends ContextBasedCreator<RuleActor> { |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
private final TenantId tenantId; |
|||
private final RuleId ruleId; |
|||
|
|||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleId ruleId) { |
|||
super(context); |
|||
this.tenantId = tenantId; |
|||
this.ruleId = ruleId; |
|||
} |
|||
|
|||
@Override |
|||
public RuleActor create() throws Exception { |
|||
return new RuleActor(context, tenantId, ruleId); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected long getErrorPersistFrequency() { |
|||
return systemContext.getRuleErrorPersistFrequency(); |
|||
} |
|||
} |
|||
@ -1,345 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import java.util.*; |
|||
|
|||
import com.fasterxml.jackson.core.JsonProcessingException; |
|||
import org.springframework.util.StringUtils; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper; |
|||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
|||
import org.thingsboard.server.common.data.id.PluginId; |
|||
import org.thingsboard.server.common.data.id.RuleId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
|||
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleMetaData; |
|||
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; |
|||
import org.thingsboard.server.common.msg.core.BasicRequest; |
|||
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; |
|||
import org.thingsboard.server.common.msg.core.RuleEngineError; |
|||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; |
|||
import org.thingsboard.server.common.msg.session.MsgType; |
|||
import org.thingsboard.server.common.msg.session.ToDeviceMsg; |
|||
import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException; |
|||
import org.thingsboard.server.extensions.api.rules.*; |
|||
import org.thingsboard.server.extensions.api.plugins.PluginAction; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg; |
|||
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
|
|||
import akka.actor.ActorContext; |
|||
import akka.actor.ActorRef; |
|||
import akka.event.LoggingAdapter; |
|||
|
|||
class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|||
|
|||
private final RuleProcessingContext ruleCtx; |
|||
private final Map<UUID, RuleProcessingMsg> pendingMsgMap; |
|||
|
|||
private RuleMetaData ruleMd; |
|||
private ComponentLifecycleState state; |
|||
private List<RuleFilter> filters; |
|||
private RuleProcessor processor; |
|||
private PluginAction action; |
|||
|
|||
private TenantId pluginTenantId; |
|||
private PluginId pluginId; |
|||
|
|||
protected RuleActorMessageProcessor(TenantId tenantId, RuleId ruleId, ActorSystemContext systemContext, LoggingAdapter logger) { |
|||
super(systemContext, logger, tenantId, ruleId); |
|||
this.pendingMsgMap = new HashMap<>(); |
|||
this.ruleCtx = new RuleProcessingContext(systemContext, ruleId); |
|||
} |
|||
|
|||
@Override |
|||
public void start() throws Exception { |
|||
logger.info("[{}][{}] Starting rule actor.", entityId, tenantId); |
|||
ruleMd = systemContext.getRuleService().findRuleById(entityId); |
|||
if (ruleMd == null) { |
|||
throw new RuleInitializationException("Rule not found!"); |
|||
} |
|||
state = ruleMd.getState(); |
|||
if (state == ComponentLifecycleState.ACTIVE) { |
|||
logger.info("[{}] Rule is active. Going to initialize rule components.", entityId); |
|||
initComponent(); |
|||
} else { |
|||
logger.info("[{}] Rule is suspended. Skipping rule components initialization.", entityId); |
|||
} |
|||
|
|||
logger.info("[{}][{}] Started rule actor.", entityId, tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public void stop() throws Exception { |
|||
onStop(); |
|||
} |
|||
|
|||
|
|||
private void initComponent() throws RuleException { |
|||
try { |
|||
if (!ruleMd.getFilters().isArray()) { |
|||
throw new RuntimeException("Filters are not array!"); |
|||
} |
|||
fetchPluginInfo(); |
|||
initFilters(); |
|||
initProcessor(); |
|||
initAction(); |
|||
} catch (RuntimeException e) { |
|||
throw new RuleInitializationException("Unknown runtime exception!", e); |
|||
} catch (InstantiationException e) { |
|||
throw new RuleInitializationException("No default constructor for rule implementation!", e); |
|||
} catch (IllegalAccessException e) { |
|||
throw new RuleInitializationException("Illegal Access Exception during rule initialization!", e); |
|||
} catch (ClassNotFoundException e) { |
|||
throw new RuleInitializationException("Rule Class not found!", e); |
|||
} catch (Exception e) { |
|||
throw new RuleException(e.getMessage(), e); |
|||
} |
|||
} |
|||
|
|||
private void initAction() throws Exception { |
|||
if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) { |
|||
action = initComponent(ruleMd.getAction()); |
|||
} |
|||
} |
|||
|
|||
private void initProcessor() throws Exception { |
|||
if (ruleMd.getProcessor() != null && !ruleMd.getProcessor().isNull()) { |
|||
processor = initComponent(ruleMd.getProcessor()); |
|||
} |
|||
} |
|||
|
|||
private void initFilters() throws Exception { |
|||
filters = new ArrayList<>(ruleMd.getFilters().size()); |
|||
for (int i = 0; i < ruleMd.getFilters().size(); i++) { |
|||
filters.add(initComponent(ruleMd.getFilters().get(i))); |
|||
} |
|||
} |
|||
|
|||
private void fetchPluginInfo() { |
|||
if (!StringUtils.isEmpty(ruleMd.getPluginToken())) { |
|||
PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); |
|||
pluginTenantId = pluginMd.getTenantId(); |
|||
pluginId = pluginMd.getId(); |
|||
} |
|||
} |
|||
|
|||
protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException { |
|||
if (state != ComponentLifecycleState.ACTIVE) { |
|||
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_ACTIVE_RULES); |
|||
return; |
|||
} |
|||
ChainProcessingContext chainCtx = msg.getCtx(); |
|||
ToDeviceActorMsg inMsg = chainCtx.getInMsg(); |
|||
|
|||
ruleCtx.update(inMsg, chainCtx.getDeviceMetaData()); |
|||
|
|||
logger.debug("[{}] Going to filter in msg: {}", entityId, inMsg); |
|||
for (RuleFilter filter : filters) { |
|||
if (!filter.filter(ruleCtx, inMsg)) { |
|||
logger.debug("[{}] In msg is NOT valid for processing by current rule: {}", entityId, inMsg); |
|||
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_FILTERS_MATCHED); |
|||
return; |
|||
} |
|||
} |
|||
RuleProcessingMetaData inMsgMd; |
|||
if (processor != null) { |
|||
logger.debug("[{}] Going to process in msg: {}", entityId, inMsg); |
|||
inMsgMd = processor.process(ruleCtx, inMsg); |
|||
} else { |
|||
inMsgMd = new RuleProcessingMetaData(); |
|||
} |
|||
logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg); |
|||
if (action != null) { |
|||
Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); |
|||
if (ruleToPluginMsgOptional.isPresent()) { |
|||
RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get(); |
|||
logger.debug("[{}] Device msg is converted to: {}", entityId, ruleToPluginMsg); |
|||
context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); |
|||
if (action.isOneWayAction()) { |
|||
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); |
|||
return; |
|||
} else { |
|||
pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); |
|||
scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); |
|||
return; |
|||
} |
|||
} |
|||
} |
|||
logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); |
|||
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); |
|||
} |
|||
|
|||
void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) { |
|||
RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); |
|||
if (pendingMsg != null) { |
|||
ChainProcessingContext ctx = pendingMsg.getCtx(); |
|||
Optional<ToDeviceMsg> ruleResponseOptional = action.convert(msg); |
|||
if (ruleResponseOptional.isPresent()) { |
|||
ctx.mergeResponse(ruleResponseOptional.get()); |
|||
pushToNextRule(context, ctx, null); |
|||
} else { |
|||
pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS); |
|||
} |
|||
} else { |
|||
logger.warning("[{}] Processing timeout detected: [{}]", entityId, msg.getUid()); |
|||
} |
|||
} |
|||
|
|||
void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { |
|||
RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); |
|||
if (pendingMsg != null) { |
|||
logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg); |
|||
ChainProcessingContext ctx = pendingMsg.getCtx(); |
|||
pushToNextRule(context, ctx, RuleEngineError.PLUGIN_TIMEOUT); |
|||
} |
|||
} |
|||
|
|||
private void pushToNextRule(ActorContext context, ChainProcessingContext ctx, RuleEngineError error) { |
|||
if (error != null) { |
|||
ctx = ctx.withError(error); |
|||
} |
|||
if (ctx.isFailure()) { |
|||
logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); |
|||
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); |
|||
} else if (!ctx.hasNext()) { |
|||
logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); |
|||
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); |
|||
} else { |
|||
logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); |
|||
ChainProcessingContext nextTask = ctx.getNext(); |
|||
nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self()); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onCreated(ActorContext context) { |
|||
logger.info("[{}] Going to process onCreated rule.", entityId); |
|||
} |
|||
|
|||
@Override |
|||
public void onUpdate(ActorContext context) throws RuleException { |
|||
RuleMetaData oldRuleMd = ruleMd; |
|||
ruleMd = systemContext.getRuleService().findRuleById(entityId); |
|||
logger.info("[{}] Rule configuration was updated from {} to {}.", entityId, oldRuleMd, ruleMd); |
|||
try { |
|||
fetchPluginInfo(); |
|||
if (filters == null || !Objects.equals(oldRuleMd.getFilters(), ruleMd.getFilters())) { |
|||
logger.info("[{}] Rule filters require restart due to json change from {} to {}.", |
|||
entityId, mapper.writeValueAsString(oldRuleMd.getFilters()), mapper.writeValueAsString(ruleMd.getFilters())); |
|||
stopFilters(); |
|||
initFilters(); |
|||
} |
|||
if (processor == null || !Objects.equals(oldRuleMd.getProcessor(), ruleMd.getProcessor())) { |
|||
logger.info("[{}] Rule processor require restart due to configuration change.", entityId); |
|||
stopProcessor(); |
|||
initProcessor(); |
|||
} |
|||
if (action == null || !Objects.equals(oldRuleMd.getAction(), ruleMd.getAction())) { |
|||
logger.info("[{}] Rule action require restart due to configuration change.", entityId); |
|||
stopAction(); |
|||
initAction(); |
|||
} |
|||
} catch (RuntimeException e) { |
|||
throw new RuleInitializationException("Unknown runtime exception!", e); |
|||
} catch (InstantiationException e) { |
|||
throw new RuleInitializationException("No default constructor for rule implementation!", e); |
|||
} catch (IllegalAccessException e) { |
|||
throw new RuleInitializationException("Illegal Access Exception during rule initialization!", e); |
|||
} catch (ClassNotFoundException e) { |
|||
throw new RuleInitializationException("Rule Class not found!", e); |
|||
} catch (JsonProcessingException e) { |
|||
throw new RuleInitializationException("Rule configuration is invalid!", e); |
|||
} catch (Exception e) { |
|||
throw new RuleInitializationException(e.getMessage(), e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onActivate(ActorContext context) throws Exception { |
|||
logger.info("[{}] Going to process onActivate rule.", entityId); |
|||
this.state = ComponentLifecycleState.ACTIVE; |
|||
if (filters != null) { |
|||
filters.forEach(RuleLifecycleComponent::resume); |
|||
if (processor != null) { |
|||
processor.resume(); |
|||
} else { |
|||
initProcessor(); |
|||
} |
|||
if (action != null) { |
|||
action.resume(); |
|||
} |
|||
logger.info("[{}] Rule resumed.", entityId); |
|||
} else { |
|||
start(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onSuspend(ActorContext context) { |
|||
logger.info("[{}] Going to process onSuspend rule.", entityId); |
|||
this.state = ComponentLifecycleState.SUSPENDED; |
|||
if (filters != null) { |
|||
filters.forEach(f -> f.suspend()); |
|||
} |
|||
if (processor != null) { |
|||
processor.suspend(); |
|||
} |
|||
if (action != null) { |
|||
action.suspend(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onStop(ActorContext context) { |
|||
logger.info("[{}] Going to process onStop rule.", entityId); |
|||
onStop(); |
|||
scheduleMsgWithDelay(context, new RuleTerminationMsg(entityId), systemContext.getRuleActorTerminationDelay()); |
|||
} |
|||
|
|||
private void onStop() { |
|||
this.state = ComponentLifecycleState.SUSPENDED; |
|||
stopFilters(); |
|||
stopProcessor(); |
|||
stopAction(); |
|||
} |
|||
|
|||
@Override |
|||
public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { |
|||
//Do nothing
|
|||
} |
|||
|
|||
private void stopAction() { |
|||
if (action != null) { |
|||
action.stop(); |
|||
} |
|||
} |
|||
|
|||
private void stopProcessor() { |
|||
if (processor != null) { |
|||
processor.stop(); |
|||
} |
|||
} |
|||
|
|||
private void stopFilters() { |
|||
if (filters != null) { |
|||
filters.forEach(f -> f.stop()); |
|||
} |
|||
} |
|||
} |
|||
@ -1,107 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import java.util.Comparator; |
|||
|
|||
import org.thingsboard.server.common.data.id.RuleId; |
|||
|
|||
import akka.actor.ActorRef; |
|||
|
|||
public class RuleActorMetaData { |
|||
|
|||
private final RuleId ruleId; |
|||
private final boolean systemRule; |
|||
private final int weight; |
|||
private final ActorRef actorRef; |
|||
|
|||
public static final Comparator<RuleActorMetaData> RULE_ACTOR_MD_COMPARATOR = new Comparator<RuleActorMetaData>() { |
|||
|
|||
@Override |
|||
public int compare(RuleActorMetaData r1, RuleActorMetaData r2) { |
|||
if (r1.isSystemRule() && !r2.isSystemRule()) { |
|||
return 1; |
|||
} else if (!r1.isSystemRule() && r2.isSystemRule()) { |
|||
return -1; |
|||
} else { |
|||
return Integer.compare(r2.getWeight(), r1.getWeight()); |
|||
} |
|||
} |
|||
}; |
|||
|
|||
public static RuleActorMetaData systemRule(RuleId ruleId, int weight, ActorRef actorRef) { |
|||
return new RuleActorMetaData(ruleId, true, weight, actorRef); |
|||
} |
|||
|
|||
public static RuleActorMetaData tenantRule(RuleId ruleId, int weight, ActorRef actorRef) { |
|||
return new RuleActorMetaData(ruleId, false, weight, actorRef); |
|||
} |
|||
|
|||
private RuleActorMetaData(RuleId ruleId, boolean systemRule, int weight, ActorRef actorRef) { |
|||
super(); |
|||
this.ruleId = ruleId; |
|||
this.systemRule = systemRule; |
|||
this.weight = weight; |
|||
this.actorRef = actorRef; |
|||
} |
|||
|
|||
public RuleId getRuleId() { |
|||
return ruleId; |
|||
} |
|||
|
|||
public boolean isSystemRule() { |
|||
return systemRule; |
|||
} |
|||
|
|||
public int getWeight() { |
|||
return weight; |
|||
} |
|||
|
|||
public ActorRef getActorRef() { |
|||
return actorRef; |
|||
} |
|||
|
|||
@Override |
|||
public int hashCode() { |
|||
final int prime = 31; |
|||
int result = 1; |
|||
result = prime * result + ((ruleId == null) ? 0 : ruleId.hashCode()); |
|||
return result; |
|||
} |
|||
|
|||
@Override |
|||
public boolean equals(Object obj) { |
|||
if (this == obj) |
|||
return true; |
|||
if (obj == null) |
|||
return false; |
|||
if (getClass() != obj.getClass()) |
|||
return false; |
|||
RuleActorMetaData other = (RuleActorMetaData) obj; |
|||
if (ruleId == null) { |
|||
if (other.ruleId != null) |
|||
return false; |
|||
} else if (!ruleId.equals(other.ruleId)) |
|||
return false; |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return "RuleActorMetaData [ruleId=" + ruleId + ", systemRule=" + systemRule + ", weight=" + weight + ", actorRef=" + actorRef + "]"; |
|||
} |
|||
|
|||
} |
|||
@ -1,115 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.common.data.Event; |
|||
import org.thingsboard.server.common.data.alarm.Alarm; |
|||
import org.thingsboard.server.common.data.alarm.AlarmId; |
|||
import org.thingsboard.server.common.data.id.*; |
|||
import org.thingsboard.server.dao.alarm.AlarmService; |
|||
import org.thingsboard.server.dao.event.EventService; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; |
|||
import org.thingsboard.server.extensions.api.device.DeviceMetaData; |
|||
import org.thingsboard.server.extensions.api.rules.RuleContext; |
|||
|
|||
import java.util.Optional; |
|||
import java.util.concurrent.ExecutionException; |
|||
|
|||
public class RuleProcessingContext implements RuleContext { |
|||
|
|||
private final TimeseriesService tsService; |
|||
private final EventService eventService; |
|||
private final AlarmService alarmService; |
|||
private final RuleId ruleId; |
|||
private TenantId tenantId; |
|||
private CustomerId customerId; |
|||
private DeviceId deviceId; |
|||
private DeviceMetaData deviceMetaData; |
|||
|
|||
RuleProcessingContext(ActorSystemContext systemContext, RuleId ruleId) { |
|||
this.tsService = systemContext.getTsService(); |
|||
this.eventService = systemContext.getEventService(); |
|||
this.alarmService = systemContext.getAlarmService(); |
|||
this.ruleId = ruleId; |
|||
} |
|||
|
|||
void update(ToDeviceActorMsg toDeviceActorMsg, DeviceMetaData deviceMetaData) { |
|||
this.tenantId = toDeviceActorMsg.getTenantId(); |
|||
this.customerId = toDeviceActorMsg.getCustomerId(); |
|||
this.deviceId = toDeviceActorMsg.getDeviceId(); |
|||
this.deviceMetaData = deviceMetaData; |
|||
} |
|||
|
|||
@Override |
|||
public RuleId getRuleId() { |
|||
return ruleId; |
|||
} |
|||
|
|||
@Override |
|||
public DeviceMetaData getDeviceMetaData() { |
|||
return deviceMetaData; |
|||
} |
|||
|
|||
@Override |
|||
public Event save(Event event) { |
|||
checkEvent(event); |
|||
return eventService.save(event); |
|||
} |
|||
|
|||
@Override |
|||
public Optional<Event> saveIfNotExists(Event event) { |
|||
checkEvent(event); |
|||
return eventService.saveIfNotExists(event); |
|||
} |
|||
|
|||
@Override |
|||
public Optional<Event> findEvent(String eventType, String eventUid) { |
|||
return eventService.findEvent(tenantId, deviceId, eventType, eventUid); |
|||
} |
|||
|
|||
@Override |
|||
public Alarm createOrUpdateAlarm(Alarm alarm) { |
|||
alarm.setTenantId(tenantId); |
|||
return alarmService.createOrUpdateAlarm(alarm); |
|||
} |
|||
|
|||
public Optional<Alarm> findLatestAlarm(EntityId originator, String alarmType) { |
|||
try { |
|||
return Optional.ofNullable(alarmService.findLatestByOriginatorAndType(tenantId, originator, alarmType).get()); |
|||
} catch (InterruptedException | ExecutionException e) { |
|||
throw new RuntimeException("Failed to lookup alarm!", e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Boolean> clearAlarm(AlarmId alarmId, long clearTs) { |
|||
return alarmService.clearAlarm(alarmId, clearTs); |
|||
} |
|||
|
|||
private void checkEvent(Event event) { |
|||
if (event.getTenantId() == null) { |
|||
event.setTenantId(tenantId); |
|||
} else if (!tenantId.equals(event.getTenantId())) { |
|||
throw new IllegalArgumentException("Invalid Tenant id!"); |
|||
} |
|||
if (event.getEntityId() == null) { |
|||
event.setEntityId(deviceId); |
|||
} |
|||
} |
|||
} |
|||
@ -1,36 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.rule; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.UUID; |
|||
|
|||
public class RuleToPluginTimeoutMsg implements Serializable { |
|||
|
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
private final UUID msgId; |
|||
|
|||
public RuleToPluginTimeoutMsg(UUID msgId) { |
|||
super(); |
|||
this.msgId = msgId; |
|||
} |
|||
|
|||
public UUID getMsgId() { |
|||
return msgId; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,154 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import org.thingsboard.rule.engine.api.ListeningExecutor; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|||
import org.thingsboard.server.dao.alarm.AlarmService; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.customer.CustomerService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.plugin.PluginService; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|||
import org.thingsboard.server.dao.user.UserService; |
|||
|
|||
import java.util.Set; |
|||
|
|||
/** |
|||
* Created by ashvayka on 19.03.18. |
|||
*/ |
|||
class DefaultTbContext implements TbContext { |
|||
|
|||
private final ActorSystemContext mainCtx; |
|||
private final RuleNodeCtx nodeCtx; |
|||
|
|||
public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) { |
|||
this.mainCtx = mainCtx; |
|||
this.nodeCtx = nodeCtx; |
|||
} |
|||
|
|||
@Override |
|||
public void tellNext(TbMsg msg) { |
|||
tellNext(msg, (String) null); |
|||
} |
|||
|
|||
@Override |
|||
public void tellNext(TbMsg msg, String relationType) { |
|||
if (nodeCtx.getSelf().isDebugMode()) { |
|||
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg); |
|||
} |
|||
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor()); |
|||
} |
|||
|
|||
@Override |
|||
public void tellSelf(TbMsg msg, long delayMs) { |
|||
throw new RuntimeException("Not Implemented!"); |
|||
} |
|||
|
|||
@Override |
|||
public void tellOthers(TbMsg msg) { |
|||
throw new RuntimeException("Not Implemented!"); |
|||
} |
|||
|
|||
@Override |
|||
public void tellSibling(TbMsg msg, ServerAddress address) { |
|||
throw new RuntimeException("Not Implemented!"); |
|||
} |
|||
|
|||
@Override |
|||
public void spawn(TbMsg msg) { |
|||
throw new RuntimeException("Not Implemented!"); |
|||
} |
|||
|
|||
@Override |
|||
public void ack(TbMsg msg) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void tellError(TbMsg msg, Throwable th) { |
|||
if (nodeCtx.getSelf().isDebugMode()) { |
|||
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th); |
|||
} |
|||
nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor()); |
|||
} |
|||
|
|||
@Override |
|||
public void tellNext(TbMsg msg, Set<String> relationTypes) { |
|||
relationTypes.forEach(type -> tellNext(msg, type)); |
|||
} |
|||
|
|||
@Override |
|||
public ListeningExecutor getJsExecutor() { |
|||
return mainCtx.getExecutor(); |
|||
} |
|||
|
|||
@Override |
|||
public AttributesService getAttributesService() { |
|||
return mainCtx.getAttributesService(); |
|||
} |
|||
|
|||
@Override |
|||
public CustomerService getCustomerService() { |
|||
return mainCtx.getCustomerService(); |
|||
} |
|||
|
|||
@Override |
|||
public UserService getUserService() { |
|||
return mainCtx.getUserService(); |
|||
} |
|||
|
|||
@Override |
|||
public PluginService getPluginService() { |
|||
return mainCtx.getPluginService(); |
|||
} |
|||
|
|||
@Override |
|||
public AssetService getAssetService() { |
|||
return mainCtx.getAssetService(); |
|||
} |
|||
|
|||
@Override |
|||
public DeviceService getDeviceService() { |
|||
return mainCtx.getDeviceService(); |
|||
} |
|||
|
|||
@Override |
|||
public AlarmService getAlarmService() { |
|||
return mainCtx.getAlarmService(); |
|||
} |
|||
|
|||
@Override |
|||
public RuleChainService getRuleChainService() { |
|||
return mainCtx.getRuleChainService(); |
|||
} |
|||
|
|||
@Override |
|||
public TimeseriesService getTimeseriesService() { |
|||
return mainCtx.getTsService(); |
|||
} |
|||
|
|||
@Override |
|||
public RelationService getRelationService() { |
|||
return mainCtx.getRelationService(); |
|||
} |
|||
} |
|||
@ -0,0 +1,88 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import akka.actor.OneForOneStrategy; |
|||
import akka.actor.SupervisorStrategy; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.ComponentActor; |
|||
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; |
|||
import scala.concurrent.duration.Duration; |
|||
|
|||
public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> { |
|||
|
|||
private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId) { |
|||
super(systemContext, tenantId, ruleChainId); |
|||
setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChainId, systemContext, |
|||
logger, context().parent(), context().self())); |
|||
} |
|||
|
|||
@Override |
|||
protected boolean process(TbActorMsg msg) { |
|||
switch (msg.getMsgType()) { |
|||
case COMPONENT_LIFE_CYCLE_MSG: |
|||
onComponentLifecycleMsg((ComponentLifecycleMsg) msg); |
|||
break; |
|||
case SERVICE_TO_RULE_ENGINE_MSG: |
|||
processor.onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg); |
|||
break; |
|||
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG: |
|||
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); |
|||
break; |
|||
default: |
|||
return false; |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
public static class ActorCreator extends ContextBasedCreator<RuleChainActor> { |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
private final TenantId tenantId; |
|||
private final RuleChainId ruleChainId; |
|||
|
|||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId pluginId) { |
|||
super(context); |
|||
this.tenantId = tenantId; |
|||
this.ruleChainId = pluginId; |
|||
} |
|||
|
|||
@Override |
|||
public RuleChainActor create() throws Exception { |
|||
return new RuleChainActor(context, tenantId, ruleChainId); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected long getErrorPersistFrequency() { |
|||
return systemContext.getRuleChainErrorPersistFrequency(); |
|||
} |
|||
|
|||
@Override |
|||
public SupervisorStrategy supervisorStrategy() { |
|||
return strategy; |
|||
} |
|||
|
|||
private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> { |
|||
logAndPersist("Unknown Failure", ActorSystemContext.toException(t)); |
|||
return SupervisorStrategy.resume(); |
|||
}); |
|||
} |
|||
@ -0,0 +1,194 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import akka.actor.ActorContext; |
|||
import akka.actor.ActorRef; |
|||
import akka.actor.Props; |
|||
import akka.event.LoggingAdapter; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.DefaultActorService; |
|||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleNode; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.stream.Collectors; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> { |
|||
|
|||
private final ActorRef parent; |
|||
private final ActorRef self; |
|||
private final Map<RuleNodeId, RuleNodeCtx> nodeActors; |
|||
private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes; |
|||
private final RuleChainService service; |
|||
|
|||
private RuleNodeId firstId; |
|||
private RuleNodeCtx firstNode; |
|||
|
|||
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext |
|||
, LoggingAdapter logger, ActorRef parent, ActorRef self) { |
|||
super(systemContext, logger, tenantId, ruleChainId); |
|||
this.parent = parent; |
|||
this.self = self; |
|||
this.nodeActors = new HashMap<>(); |
|||
this.nodeRoutes = new HashMap<>(); |
|||
this.service = systemContext.getRuleChainService(); |
|||
} |
|||
|
|||
@Override |
|||
public void start(ActorContext context) throws Exception { |
|||
RuleChain ruleChain = service.findRuleChainById(entityId); |
|||
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId); |
|||
// Creating and starting the actors;
|
|||
for (RuleNode ruleNode : ruleNodeList) { |
|||
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); |
|||
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); |
|||
} |
|||
initRoutes(ruleChain, ruleNodeList); |
|||
} |
|||
|
|||
@Override |
|||
public void onUpdate(ActorContext context) throws Exception { |
|||
RuleChain ruleChain = service.findRuleChainById(entityId); |
|||
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId); |
|||
|
|||
for (RuleNode ruleNode : ruleNodeList) { |
|||
RuleNodeCtx existing = nodeActors.get(ruleNode.getId()); |
|||
if (existing == null) { |
|||
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); |
|||
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); |
|||
} else { |
|||
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self); |
|||
} |
|||
} |
|||
|
|||
Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet()); |
|||
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList()); |
|||
removedRules.forEach(ruleNodeId -> { |
|||
RuleNodeCtx removed = nodeActors.remove(ruleNodeId); |
|||
removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self); |
|||
}); |
|||
|
|||
initRoutes(ruleChain, ruleNodeList); |
|||
} |
|||
|
|||
@Override |
|||
public void stop(ActorContext context) throws Exception { |
|||
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop); |
|||
nodeActors.clear(); |
|||
nodeRoutes.clear(); |
|||
context.stop(self); |
|||
} |
|||
|
|||
@Override |
|||
public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { |
|||
|
|||
} |
|||
|
|||
private ActorRef createRuleNodeActor(ActorContext context, RuleNode ruleNode) { |
|||
String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ? |
|||
DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME; |
|||
return context.actorOf( |
|||
Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId())) |
|||
.withDispatcher(dispatcherName), ruleNode.getId().toString()); |
|||
} |
|||
|
|||
private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) { |
|||
nodeRoutes.clear(); |
|||
// Populating the routes map;
|
|||
for (RuleNode ruleNode : ruleNodeList) { |
|||
List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId()); |
|||
for (EntityRelation relation : relations) { |
|||
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) { |
|||
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId())); |
|||
if (ruleNodeCtx == null) { |
|||
throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]"); |
|||
} |
|||
} |
|||
nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>()) |
|||
.add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType())); |
|||
} |
|||
} |
|||
|
|||
firstId = ruleChain.getFirstRuleNodeId(); |
|||
firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId()); |
|||
state = ComponentLifecycleState.ACTIVE; |
|||
} |
|||
|
|||
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { |
|||
checkActive(); |
|||
TbMsg tbMsg = envelope.getTbMsg(); |
|||
//TODO: push to queue and act on ack in async way
|
|||
pushMstToNode(firstNode, tbMsg); |
|||
} |
|||
|
|||
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) { |
|||
checkActive(); |
|||
RuleNodeId originator = envelope.getOriginator(); |
|||
String targetRelationType = envelope.getRelationType(); |
|||
List<RuleNodeRelation> relations = nodeRoutes.get(originator); |
|||
if (relations == null) { |
|||
return; |
|||
} |
|||
boolean copy = relations.size() > 1; |
|||
for (RuleNodeRelation relation : relations) { |
|||
TbMsg msg = envelope.getMsg(); |
|||
if (copy) { |
|||
msg = msg.copy(); |
|||
} |
|||
if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) { |
|||
switch (relation.getOut().getEntityType()) { |
|||
case RULE_NODE: |
|||
RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId()); |
|||
RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId); |
|||
pushMstToNode(targetRuleNode, msg); |
|||
break; |
|||
case RULE_CHAIN: |
|||
// TODO: implement
|
|||
break; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) { |
|||
if (nodeCtx != null) { |
|||
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,61 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import akka.actor.ActorRef; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.ContextAwareActor; |
|||
import org.thingsboard.server.actors.shared.plugin.PluginManager; |
|||
import org.thingsboard.server.actors.shared.rulechain.RuleChainManager; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.PluginId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
public abstract class RuleChainManagerActor extends ContextAwareActor { |
|||
|
|||
protected final RuleChainManager ruleChainManager; |
|||
protected final PluginManager pluginManager; |
|||
protected final RuleChainService ruleChainService; |
|||
|
|||
public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager, PluginManager pluginManager) { |
|||
super(systemContext); |
|||
this.ruleChainManager = ruleChainManager; |
|||
this.pluginManager = pluginManager; |
|||
this.ruleChainService = systemContext.getRuleChainService(); |
|||
} |
|||
|
|||
protected void initRuleChains() { |
|||
pluginManager.init(this.context()); |
|||
ruleChainManager.init(this.context()); |
|||
} |
|||
|
|||
protected ActorRef getEntityActorRef(EntityId entityId) { |
|||
ActorRef target = null; |
|||
switch (entityId.getEntityType()) { |
|||
case PLUGIN: |
|||
target = pluginManager.getOrCreateActor(this.context(), (PluginId) entityId); |
|||
break; |
|||
case RULE_CHAIN: |
|||
target = ruleChainManager.getOrCreateActor(this.context(), (RuleChainId) entityId); |
|||
break; |
|||
} |
|||
return target; |
|||
} |
|||
} |
|||
@ -0,0 +1,96 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.ComponentActor; |
|||
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
|
|||
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> { |
|||
|
|||
private final RuleChainId ruleChainId; |
|||
|
|||
private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { |
|||
super(systemContext, tenantId, ruleNodeId); |
|||
this.ruleChainId = ruleChainId; |
|||
setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext, |
|||
logger, context().parent(), context().self())); |
|||
} |
|||
|
|||
@Override |
|||
protected boolean process(TbActorMsg msg) { |
|||
switch (msg.getMsgType()) { |
|||
case COMPONENT_LIFE_CYCLE_MSG: |
|||
onComponentLifecycleMsg((ComponentLifecycleMsg) msg); |
|||
break; |
|||
case RULE_CHAIN_TO_RULE_MSG: |
|||
onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg); |
|||
break; |
|||
case RULE_TO_SELF_ERROR_MSG: |
|||
onRuleNodeToSelfErrorMsg((RuleNodeToSelfErrorMsg) msg); |
|||
break; |
|||
default: |
|||
return false; |
|||
} |
|||
return true; |
|||
} |
|||
|
|||
private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) { |
|||
logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg()); |
|||
try { |
|||
processor.onRuleChainToRuleNodeMsg(msg); |
|||
increaseMessagesProcessedCount(); |
|||
} catch (Exception e) { |
|||
logAndPersist("onRuleMsg", e); |
|||
} |
|||
} |
|||
|
|||
private void onRuleNodeToSelfErrorMsg(RuleNodeToSelfErrorMsg msg) { |
|||
logAndPersist("onRuleMsg", ActorSystemContext.toException(msg.getError())); |
|||
} |
|||
|
|||
public static class ActorCreator extends ContextBasedCreator<RuleNodeActor> { |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
private final TenantId tenantId; |
|||
private final RuleChainId ruleChainId; |
|||
private final RuleNodeId ruleNodeId; |
|||
|
|||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { |
|||
super(context); |
|||
this.tenantId = tenantId; |
|||
this.ruleChainId = ruleChainId; |
|||
this.ruleNodeId = ruleNodeId; |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public RuleNodeActor create() throws Exception { |
|||
return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected long getErrorPersistFrequency() { |
|||
return systemContext.getRuleNodeErrorPersistFrequency(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,99 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import akka.actor.ActorContext; |
|||
import akka.actor.ActorRef; |
|||
import akka.event.LoggingAdapter; |
|||
import org.thingsboard.rule.engine.api.TbNode; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeState; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
|||
import org.thingsboard.server.common.data.rule.RuleNode; |
|||
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> { |
|||
|
|||
private final ActorRef parent; |
|||
private final ActorRef self; |
|||
private final RuleChainService service; |
|||
private RuleNode ruleNode; |
|||
private TbNode tbNode; |
|||
|
|||
RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext |
|||
, LoggingAdapter logger, ActorRef parent, ActorRef self) { |
|||
super(systemContext, logger, tenantId, ruleNodeId); |
|||
this.parent = parent; |
|||
this.self = self; |
|||
this.service = systemContext.getRuleChainService(); |
|||
this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(entityId); |
|||
} |
|||
|
|||
@Override |
|||
public void start(ActorContext context) throws Exception { |
|||
tbNode = initComponent(ruleNode); |
|||
state = ComponentLifecycleState.ACTIVE; |
|||
} |
|||
|
|||
@Override |
|||
public void onUpdate(ActorContext context) throws Exception { |
|||
RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(entityId); |
|||
boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType()) |
|||
&& ruleNode.getConfiguration().equals(newRuleNode.getConfiguration())); |
|||
this.ruleNode = newRuleNode; |
|||
if (restartRequired) { |
|||
tbNode.destroy(); |
|||
start(context); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void stop(ActorContext context) throws Exception { |
|||
tbNode.destroy(); |
|||
context.stop(self); |
|||
} |
|||
|
|||
@Override |
|||
public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { |
|||
|
|||
} |
|||
|
|||
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { |
|||
checkActive(); |
|||
if (ruleNode.isDebugMode()) { |
|||
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg()); |
|||
} |
|||
tbNode.onMsg(msg.getCtx(), msg.getMsg()); |
|||
} |
|||
|
|||
private TbNode initComponent(RuleNode ruleNode) throws Exception { |
|||
Class<?> componentClazz = Class.forName(ruleNode.getType()); |
|||
TbNode tbNode = (TbNode) (componentClazz.newInstance()); |
|||
tbNode.init(new TbNodeConfiguration(ruleNode.getConfiguration()), new TbNodeState()); |
|||
return tbNode; |
|||
} |
|||
|
|||
|
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.ruleChain; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.msg.MsgType; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
/** |
|||
* Created by ashvayka on 19.03.18. |
|||
*/ |
|||
@Data |
|||
final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg { |
|||
|
|||
private final RuleNodeId originator; |
|||
private final String relationType; |
|||
private final TbMsg msg; |
|||
|
|||
@Override |
|||
public MsgType getMsgType() { |
|||
return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,86 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.shared; |
|||
|
|||
import akka.actor.ActorContext; |
|||
import akka.actor.ActorRef; |
|||
import akka.actor.Props; |
|||
import akka.actor.UntypedActor; |
|||
import akka.japi.Creator; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.service.ContextAwareActor; |
|||
import org.thingsboard.server.common.data.SearchTextBased; |
|||
import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.UUIDBased; |
|||
import org.thingsboard.server.common.data.page.PageDataIterable; |
|||
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
@Slf4j |
|||
public abstract class EntityActorsManager<T extends EntityId, A extends UntypedActor, M extends SearchTextBased<? extends UUIDBased>> { |
|||
|
|||
protected final ActorSystemContext systemContext; |
|||
protected final Map<T, ActorRef> actors; |
|||
|
|||
public EntityActorsManager(ActorSystemContext systemContext) { |
|||
this.systemContext = systemContext; |
|||
this.actors = new HashMap<>(); |
|||
} |
|||
|
|||
protected abstract TenantId getTenantId(); |
|||
|
|||
protected abstract String getDispatcherName(); |
|||
|
|||
protected abstract Creator<A> creator(T entityId); |
|||
|
|||
protected abstract PageDataIterable.FetchFunction<M> getFetchEntitiesFunction(); |
|||
|
|||
public void init(ActorContext context) { |
|||
for (M entity : new PageDataIterable<>(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) { |
|||
T entityId = (T) entity.getId(); |
|||
log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId()); |
|||
//TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
|
|||
ActorRef actorRef = getOrCreateActor(context, entityId); |
|||
visit(entity, actorRef); |
|||
log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId()); |
|||
} |
|||
} |
|||
|
|||
protected void visit(M entity, ActorRef actorRef) {} |
|||
|
|||
public ActorRef getOrCreateActor(ActorContext context, T entityId) { |
|||
return actors.computeIfAbsent(entityId, eId -> |
|||
context.actorOf(Props.create(creator(eId)) |
|||
.withDispatcher(getDispatcherName()), eId.toString())); |
|||
} |
|||
|
|||
public void broadcast(Object msg) { |
|||
actors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); |
|||
} |
|||
|
|||
public void remove(T id) { |
|||
actors.remove(id); |
|||
} |
|||
|
|||
} |
|||
@ -1,135 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.shared.rule; |
|||
|
|||
import akka.actor.ActorContext; |
|||
import akka.actor.ActorRef; |
|||
import akka.actor.Props; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.rule.RuleActor; |
|||
import org.thingsboard.server.actors.rule.RuleActorChain; |
|||
import org.thingsboard.server.actors.rule.RuleActorMetaData; |
|||
import org.thingsboard.server.actors.rule.SimpleRuleActorChain; |
|||
import org.thingsboard.server.actors.service.ContextAwareActor; |
|||
import org.thingsboard.server.common.data.id.RuleId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageDataIterable; |
|||
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
|||
import org.thingsboard.server.common.data.rule.RuleMetaData; |
|||
import org.thingsboard.server.dao.rule.RuleService; |
|||
|
|||
import java.util.*; |
|||
|
|||
@Slf4j |
|||
public abstract class RuleManager { |
|||
|
|||
protected final ActorSystemContext systemContext; |
|||
protected final RuleService ruleService; |
|||
protected final Map<RuleId, ActorRef> ruleActors; |
|||
protected final TenantId tenantId; |
|||
|
|||
private Map<RuleMetaData, RuleActorMetaData> ruleMap; |
|||
private RuleActorChain ruleChain; |
|||
|
|||
public RuleManager(ActorSystemContext systemContext, TenantId tenantId) { |
|||
this.systemContext = systemContext; |
|||
this.ruleService = systemContext.getRuleService(); |
|||
this.ruleActors = new HashMap<>(); |
|||
this.tenantId = tenantId; |
|||
} |
|||
|
|||
public void init(ActorContext context) { |
|||
doInit(context); |
|||
} |
|||
|
|||
private void doInit(ActorContext context) { |
|||
PageDataIterable<RuleMetaData> ruleIterator = new PageDataIterable<>(getFetchRulesFunction(), |
|||
ContextAwareActor.ENTITY_PACK_LIMIT); |
|||
ruleMap = new HashMap<>(); |
|||
|
|||
for (RuleMetaData rule : ruleIterator) { |
|||
log.debug("[{}] Creating rule actor {}", rule.getId(), rule); |
|||
ActorRef ref = getOrCreateRuleActor(context, rule.getId()); |
|||
ruleMap.put(rule, RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref)); |
|||
log.debug("[{}] Rule actor created.", rule.getId()); |
|||
} |
|||
|
|||
refreshRuleChain(); |
|||
} |
|||
|
|||
public Optional<ActorRef> update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) { |
|||
if (ruleMap == null) { |
|||
doInit(context); |
|||
} |
|||
RuleMetaData rule; |
|||
if (event != ComponentLifecycleEvent.DELETED) { |
|||
rule = systemContext.getRuleService().findRuleById(ruleId); |
|||
} else { |
|||
rule = ruleMap.keySet().stream() |
|||
.filter(r -> r.getId().equals(ruleId)) |
|||
.peek(r -> r.setState(ComponentLifecycleState.SUSPENDED)) |
|||
.findFirst() |
|||
.orElse(null); |
|||
if (rule != null) { |
|||
ruleMap.remove(rule); |
|||
ruleActors.remove(ruleId); |
|||
} |
|||
} |
|||
if (rule != null) { |
|||
RuleActorMetaData actorMd = ruleMap.get(rule); |
|||
if (actorMd == null) { |
|||
ActorRef ref = getOrCreateRuleActor(context, rule.getId()); |
|||
actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref); |
|||
ruleMap.put(rule, actorMd); |
|||
} |
|||
refreshRuleChain(); |
|||
return Optional.of(actorMd.getActorRef()); |
|||
} else { |
|||
log.warn("[{}] Can't process unknown rule!", ruleId); |
|||
return Optional.empty(); |
|||
} |
|||
} |
|||
|
|||
abstract FetchFunction<RuleMetaData> getFetchRulesFunction(); |
|||
|
|||
abstract String getDispatcherName(); |
|||
|
|||
public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) { |
|||
return ruleActors.computeIfAbsent(ruleId, rId -> |
|||
context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId)) |
|||
.withDispatcher(getDispatcherName()), rId.toString())); |
|||
} |
|||
|
|||
public RuleActorChain getRuleChain(ActorContext context) { |
|||
if (ruleChain == null) { |
|||
doInit(context); |
|||
} |
|||
return ruleChain; |
|||
} |
|||
|
|||
private void refreshRuleChain() { |
|||
Set<RuleActorMetaData> activeRuleSet = new HashSet<>(); |
|||
for (Map.Entry<RuleMetaData, RuleActorMetaData> rule : ruleMap.entrySet()) { |
|||
if (rule.getKey().getState() == ComponentLifecycleState.ACTIVE) { |
|||
activeRuleSet.add(rule.getValue()); |
|||
} |
|||
} |
|||
ruleChain = new SimpleRuleActorChain(activeRuleSet); |
|||
} |
|||
} |
|||
@ -0,0 +1,59 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.shared.rulechain; |
|||
|
|||
import akka.actor.ActorRef; |
|||
import akka.japi.Creator; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.ruleChain.RuleChainActor; |
|||
import org.thingsboard.server.actors.shared.EntityActorsManager; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
@Slf4j |
|||
public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, RuleChainActor, RuleChain> { |
|||
|
|||
protected final RuleChainService service; |
|||
@Getter |
|||
protected RuleChain rootChain; |
|||
@Getter |
|||
protected ActorRef rootChainActor; |
|||
|
|||
public RuleChainManager(ActorSystemContext systemContext) { |
|||
super(systemContext); |
|||
this.service = systemContext.getRuleChainService(); |
|||
} |
|||
|
|||
@Override |
|||
public Creator<RuleChainActor> creator(RuleChainId entityId) { |
|||
return new RuleChainActor.ActorCreator(systemContext, getTenantId(), entityId); |
|||
} |
|||
|
|||
@Override |
|||
protected void visit(RuleChain entity, ActorRef actorRef) { |
|||
if (entity.isRoot()) { |
|||
rootChain = entity; |
|||
rootChainActor = actorRef; |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,40 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.tenant; |
|||
|
|||
import org.thingsboard.server.actors.rule.RuleActorChain; |
|||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; |
|||
|
|||
public class RuleChainDeviceMsg { |
|||
|
|||
private final ToDeviceActorMsg toDeviceActorMsg; |
|||
private final RuleActorChain ruleChain; |
|||
|
|||
public RuleChainDeviceMsg(ToDeviceActorMsg toDeviceActorMsg, RuleActorChain ruleChain) { |
|||
super(); |
|||
this.toDeviceActorMsg = toDeviceActorMsg; |
|||
this.ruleChain = ruleChain; |
|||
} |
|||
|
|||
public ToDeviceActorMsg getToDeviceActorMsg() { |
|||
return toDeviceActorMsg; |
|||
} |
|||
|
|||
public RuleActorChain getRuleChain() { |
|||
return ruleChain; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.Event; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.TimePageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
|
|||
/** |
|||
* Created by ashvayka on 20.03.18. |
|||
*/ |
|||
public class AbstractRuleEngineControllerTest extends AbstractControllerTest { |
|||
|
|||
protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception { |
|||
return doPost("/api/ruleChain", ruleChain, RuleChain.class); |
|||
} |
|||
|
|||
protected RuleChain getRuleChain(RuleChainId ruleChainId) throws Exception { |
|||
return doGet("/api/ruleChain/" + ruleChainId.getId().toString(), RuleChain.class); |
|||
} |
|||
|
|||
protected RuleChainMetaData saveRuleChainMetaData(RuleChainMetaData ruleChainMD) throws Exception { |
|||
return doPost("/api/ruleChain/metadata", ruleChainMD, RuleChainMetaData.class); |
|||
} |
|||
|
|||
protected RuleChainMetaData getRuleChainMetaData(RuleChainId ruleChainId) throws Exception { |
|||
return doGet("/api/ruleChain/metadata/" + ruleChainId.getId().toString(), RuleChainMetaData.class); |
|||
} |
|||
|
|||
protected TimePageData<Event> getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception { |
|||
TimePageLink pageLink = new TimePageLink(limit); |
|||
return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&", |
|||
new TypeReference<TimePageData<Event>>() { |
|||
}, pageLink, entityId.getEntityType(), entityId.getId(), DataConstants.DEBUG, tenantId.getId()); |
|||
} |
|||
} |
|||
@ -0,0 +1,190 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.rules.flow; |
|||
|
|||
import com.datastax.driver.core.utils.UUIDs; |
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; |
|||
import org.thingsboard.server.actors.service.ActorService; |
|||
import org.thingsboard.server.common.data.*; |
|||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.page.TimePageData; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleNode; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; |
|||
import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
|
|||
import java.util.Arrays; |
|||
import java.util.Collections; |
|||
|
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
|
|||
/** |
|||
* @author Valerii Sosliuk |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRuleEngineControllerTest { |
|||
|
|||
protected Tenant savedTenant; |
|||
protected User tenantAdmin; |
|||
|
|||
@Autowired |
|||
protected ActorService actorService; |
|||
|
|||
@Autowired |
|||
protected AttributesService attributesService; |
|||
|
|||
@Autowired |
|||
protected RuleChainService ruleChainService; |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|||
Assert.assertNotNull(savedTenant); |
|||
|
|||
tenantAdmin = new User(); |
|||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin.setTenantId(savedTenant.getId()); |
|||
tenantAdmin.setEmail("tenant2@thingsboard.org"); |
|||
tenantAdmin.setFirstName("Joe"); |
|||
tenantAdmin.setLastName("Downs"); |
|||
|
|||
createUserAndLogin(tenantAdmin, "testPassword1"); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws Exception { |
|||
loginSysAdmin(); |
|||
if (savedTenant != null) { |
|||
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk()); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testRuleChainWithTwoRules() throws Exception { |
|||
// Creating Rule Chain
|
|||
RuleChain ruleChain = new RuleChain(); |
|||
ruleChain.setName("Simple Rule Chain"); |
|||
ruleChain.setTenantId(savedTenant.getId()); |
|||
ruleChain.setRoot(true); |
|||
ruleChain.setDebugMode(true); |
|||
ruleChain = saveRuleChain(ruleChain); |
|||
Assert.assertNull(ruleChain.getFirstRuleNodeId()); |
|||
|
|||
RuleChainMetaData metaData = new RuleChainMetaData(); |
|||
metaData.setRuleChainId(ruleChain.getId()); |
|||
|
|||
RuleNode ruleNode1 = new RuleNode(); |
|||
ruleNode1.setName("Simple Rule Node 1"); |
|||
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); |
|||
ruleNode1.setDebugMode(true); |
|||
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); |
|||
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); |
|||
ruleNode1.setConfiguration(mapper.valueToTree(configuration1)); |
|||
|
|||
RuleNode ruleNode2 = new RuleNode(); |
|||
ruleNode2.setName("Simple Rule Node 2"); |
|||
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); |
|||
ruleNode2.setDebugMode(true); |
|||
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); |
|||
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); |
|||
ruleNode2.setConfiguration(mapper.valueToTree(configuration2)); |
|||
|
|||
|
|||
metaData.setNodes(Arrays.asList(ruleNode1, ruleNode2)); |
|||
metaData.setFirstNodeIndex(0); |
|||
metaData.addConnectionInfo(0, 1, "Success"); |
|||
metaData = saveRuleChainMetaData(metaData); |
|||
Assert.assertNotNull(metaData); |
|||
|
|||
ruleChain = getRuleChain(ruleChain.getId()); |
|||
Assert.assertNotNull(ruleChain.getFirstRuleNodeId()); |
|||
|
|||
// Saving the device
|
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setType("default"); |
|||
device = doPost("/api/device", device, Device.class); |
|||
|
|||
attributesService.save(device.getId(), DataConstants.SERVER_SCOPE, |
|||
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey1", "serverAttributeValue1"), System.currentTimeMillis()))); |
|||
attributesService.save(device.getId(), DataConstants.SERVER_SCOPE, |
|||
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey2", "serverAttributeValue2"), System.currentTimeMillis()))); |
|||
|
|||
|
|||
Thread.sleep(1000); |
|||
|
|||
// Pushing Message to the system
|
|||
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), |
|||
"CUSTOM", |
|||
device.getId(), |
|||
new TbMsgMetaData(), |
|||
new byte[]{}); |
|||
actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); |
|||
|
|||
Thread.sleep(3000); |
|||
|
|||
TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); |
|||
|
|||
Assert.assertEquals(2, events.getData().size()); |
|||
|
|||
Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); |
|||
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); |
|||
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); |
|||
|
|||
Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); |
|||
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); |
|||
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); |
|||
|
|||
Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText()); |
|||
|
|||
RuleChain finalRuleChain = ruleChain; |
|||
RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); |
|||
|
|||
events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); |
|||
|
|||
Assert.assertEquals(2, events.getData().size()); |
|||
|
|||
inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); |
|||
Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId()); |
|||
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); |
|||
|
|||
outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); |
|||
Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId()); |
|||
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); |
|||
|
|||
Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText()); |
|||
Assert.assertEquals("serverAttributeValue2", outEvent.getBody().get("metadata").get("ss.serverAttributeKey2").asText()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,158 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.rules.lifecycle; |
|||
|
|||
import com.datastax.driver.core.utils.UUIDs; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; |
|||
import org.thingsboard.server.actors.service.ActorService; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.Event; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.page.TimePageData; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleNode; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; |
|||
import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
|
|||
import java.util.Collections; |
|||
|
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
|
|||
/** |
|||
* @author Valerii Sosliuk |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractRuleEngineLifecycleIntegrationTest extends AbstractRuleEngineControllerTest { |
|||
|
|||
protected Tenant savedTenant; |
|||
protected User tenantAdmin; |
|||
|
|||
@Autowired |
|||
protected ActorService actorService; |
|||
|
|||
@Autowired |
|||
protected AttributesService attributesService; |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|||
Assert.assertNotNull(savedTenant); |
|||
|
|||
tenantAdmin = new User(); |
|||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin.setTenantId(savedTenant.getId()); |
|||
tenantAdmin.setEmail("tenant2@thingsboard.org"); |
|||
tenantAdmin.setFirstName("Joe"); |
|||
tenantAdmin.setLastName("Downs"); |
|||
|
|||
createUserAndLogin(tenantAdmin, "testPassword1"); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws Exception { |
|||
loginSysAdmin(); |
|||
if (savedTenant != null) { |
|||
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk()); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testRuleChainWithOneRule() throws Exception { |
|||
// Creating Rule Chain
|
|||
RuleChain ruleChain = new RuleChain(); |
|||
ruleChain.setName("Simple Rule Chain"); |
|||
ruleChain.setTenantId(savedTenant.getId()); |
|||
ruleChain.setRoot(true); |
|||
ruleChain.setDebugMode(true); |
|||
ruleChain = saveRuleChain(ruleChain); |
|||
Assert.assertNull(ruleChain.getFirstRuleNodeId()); |
|||
|
|||
RuleChainMetaData metaData = new RuleChainMetaData(); |
|||
metaData.setRuleChainId(ruleChain.getId()); |
|||
|
|||
RuleNode ruleNode = new RuleNode(); |
|||
ruleNode.setName("Simple Rule Node"); |
|||
ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); |
|||
ruleNode.setDebugMode(true); |
|||
TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration(); |
|||
configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey")); |
|||
ruleNode.setConfiguration(mapper.valueToTree(configuration)); |
|||
|
|||
metaData.setNodes(Collections.singletonList(ruleNode)); |
|||
metaData.setFirstNodeIndex(0); |
|||
|
|||
metaData = saveRuleChainMetaData(metaData); |
|||
Assert.assertNotNull(metaData); |
|||
|
|||
ruleChain = getRuleChain(ruleChain.getId()); |
|||
Assert.assertNotNull(ruleChain.getFirstRuleNodeId()); |
|||
|
|||
// Saving the device
|
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setType("default"); |
|||
device = doPost("/api/device", device, Device.class); |
|||
|
|||
attributesService.save(device.getId(), DataConstants.SERVER_SCOPE, |
|||
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis()))); |
|||
|
|||
Thread.sleep(1000); |
|||
|
|||
// Pushing Message to the system
|
|||
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), |
|||
"CUSTOM", |
|||
device.getId(), |
|||
new TbMsgMetaData(), |
|||
new byte[]{}); |
|||
actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); |
|||
|
|||
Thread.sleep(3000); |
|||
|
|||
TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); |
|||
|
|||
Assert.assertEquals(2, events.getData().size()); |
|||
|
|||
Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get(); |
|||
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId()); |
|||
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText()); |
|||
|
|||
Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get(); |
|||
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId()); |
|||
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText()); |
|||
|
|||
Assert.assertEquals("serverAttributeValue", outEvent.getBody().get("metadata").get("ss.serverAttributeKey").asText()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.rules.lifecycle; |
|||
|
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
import org.thingsboard.server.rules.flow.AbstractRuleEngineFlowIntegrationTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 8/22/2017. |
|||
*/ |
|||
@DaoSqlTest |
|||
public class RuleEngineLifecycleSqlIntegrationTest extends AbstractRuleEngineLifecycleIntegrationTest { |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.rule; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
|
|||
/** |
|||
* Created by ashvayka on 21.03.18. |
|||
*/ |
|||
@Data |
|||
public class RuleChainConnectionInfo { |
|||
private int fromIndex; |
|||
private RuleChainId targetRuleChainId; |
|||
private String type; |
|||
} |
|||
@ -0,0 +1,57 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.msg; |
|||
|
|||
/** |
|||
* Created by ashvayka on 15.03.18. |
|||
*/ |
|||
public enum MsgType { |
|||
|
|||
/** |
|||
* ADDED/UPDATED/DELETED events for main entities. |
|||
* |
|||
* @See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg} |
|||
*/ |
|||
COMPONENT_LIFE_CYCLE_MSG, |
|||
|
|||
/** |
|||
* Misc messages from the REST API/SERVICE layer to the new rule engine. |
|||
* |
|||
* @See {@link org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg} |
|||
*/ |
|||
SERVICE_TO_RULE_ENGINE_MSG, |
|||
|
|||
|
|||
SESSION_TO_DEVICE_ACTOR_MSG, |
|||
DEVICE_ACTOR_TO_SESSION_MSG, |
|||
|
|||
|
|||
/** |
|||
* Message that is sent by RuleChainActor to RuleActor with command to process TbMsg. |
|||
*/ |
|||
RULE_CHAIN_TO_RULE_MSG, |
|||
|
|||
/** |
|||
* Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain. |
|||
*/ |
|||
RULE_TO_RULE_CHAIN_TELL_NEXT_MSG, |
|||
|
|||
/** |
|||
* Message that is sent by RuleActor implementation to RuleActor itself to log the error. |
|||
*/ |
|||
RULE_TO_SELF_ERROR_MSG, |
|||
|
|||
} |
|||
@ -1,163 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.service.rule; |
|||
|
|||
import com.datastax.driver.core.utils.UUIDs; |
|||
import org.junit.Assert; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.TextPageData; |
|||
import org.thingsboard.server.common.data.page.TextPageLink; |
|||
import org.thingsboard.server.common.data.plugin.PluginMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleMetaData; |
|||
import org.thingsboard.server.dao.model.ModelConstants; |
|||
import org.thingsboard.server.dao.service.AbstractServiceTest; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.ThreadLocalRandom; |
|||
|
|||
public abstract class BaseRuleServiceTest extends AbstractServiceTest { |
|||
|
|||
@Test |
|||
public void saveRule() throws Exception { |
|||
PluginMetaData plugin = generatePlugin(null, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(plugin); |
|||
RuleMetaData ruleMetaData = ruleService.saveRule(generateRule(plugin.getTenantId(), null, plugin.getApiToken())); |
|||
Assert.assertNotNull(ruleMetaData.getId()); |
|||
Assert.assertNotNull(ruleMetaData.getAdditionalInfo()); |
|||
ruleMetaData.setAdditionalInfo(mapper.readTree("{\"description\":\"test\"}")); |
|||
RuleMetaData newRuleMetaData = ruleService.saveRule(ruleMetaData); |
|||
Assert.assertEquals(ruleMetaData.getAdditionalInfo(), newRuleMetaData.getAdditionalInfo()); |
|||
} |
|||
|
|||
@Test |
|||
public void findRuleById() throws Exception { |
|||
PluginMetaData plugin = generatePlugin(null, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(plugin); |
|||
|
|||
RuleMetaData expected = ruleService.saveRule(generateRule(plugin.getTenantId(), null, plugin.getApiToken())); |
|||
Assert.assertNotNull(expected.getId()); |
|||
RuleMetaData found = ruleService.findRuleById(expected.getId()); |
|||
Assert.assertEquals(expected, found); |
|||
} |
|||
|
|||
@Test |
|||
public void findPluginRules() throws Exception { |
|||
TenantId tenantIdA = new TenantId(UUIDs.timeBased()); |
|||
TenantId tenantIdB = new TenantId(UUIDs.timeBased()); |
|||
|
|||
PluginMetaData pluginA = generatePlugin(tenantIdA, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
PluginMetaData pluginB = generatePlugin(tenantIdB, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(pluginA); |
|||
pluginService.savePlugin(pluginB); |
|||
|
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
|
|||
ruleService.saveRule(generateRule(tenantIdB, null, pluginB.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdB, null, pluginB.getApiToken())); |
|||
|
|||
List<RuleMetaData> foundA = ruleService.findPluginRules(pluginA.getApiToken()); |
|||
Assert.assertEquals(3, foundA.size()); |
|||
|
|||
List<RuleMetaData> foundB = ruleService.findPluginRules(pluginB.getApiToken()); |
|||
Assert.assertEquals(2, foundB.size()); |
|||
} |
|||
|
|||
@Test |
|||
public void findSystemRules() throws Exception { |
|||
TenantId systemTenant = new TenantId(ModelConstants.NULL_UUID); // system tenant id
|
|||
|
|||
PluginMetaData plugin = generatePlugin(systemTenant, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(plugin); |
|||
ruleService.saveRule(generateRule(systemTenant, null, plugin.getApiToken())); |
|||
ruleService.saveRule(generateRule(systemTenant, null, plugin.getApiToken())); |
|||
ruleService.saveRule(generateRule(systemTenant, null, plugin.getApiToken())); |
|||
TextPageData<RuleMetaData> found = ruleService.findSystemRules(new TextPageLink(100)); |
|||
Assert.assertEquals(3, found.getData().size()); |
|||
} |
|||
|
|||
@Test |
|||
public void findTenantRules() throws Exception { |
|||
TenantId tenantIdA = new TenantId(UUIDs.timeBased()); |
|||
TenantId tenantIdB = new TenantId(UUIDs.timeBased()); |
|||
|
|||
PluginMetaData pluginA = generatePlugin(tenantIdA, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
PluginMetaData pluginB = generatePlugin(tenantIdB, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(pluginA); |
|||
pluginService.savePlugin(pluginB); |
|||
|
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
|
|||
ruleService.saveRule(generateRule(tenantIdB, null, pluginB.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdB, null, pluginB.getApiToken())); |
|||
|
|||
TextPageData<RuleMetaData> foundA = ruleService.findTenantRules(tenantIdA, new TextPageLink(100)); |
|||
Assert.assertEquals(3, foundA.getData().size()); |
|||
|
|||
TextPageData<RuleMetaData> foundB = ruleService.findTenantRules(tenantIdB, new TextPageLink(100)); |
|||
Assert.assertEquals(2, foundB.getData().size()); |
|||
} |
|||
|
|||
@Test |
|||
public void deleteRuleById() throws Exception { |
|||
PluginMetaData plugin = generatePlugin(null, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(plugin); |
|||
|
|||
RuleMetaData expected = ruleService.saveRule(generateRule(plugin.getTenantId(), null, plugin.getApiToken())); |
|||
Assert.assertNotNull(expected.getId()); |
|||
RuleMetaData found = ruleService.findRuleById(expected.getId()); |
|||
Assert.assertEquals(expected, found); |
|||
ruleService.deleteRuleById(expected.getId()); |
|||
found = ruleService.findRuleById(expected.getId()); |
|||
Assert.assertNull(found); |
|||
} |
|||
|
|||
@Test |
|||
public void deleteRulesByTenantId() throws Exception { |
|||
TenantId tenantIdA = new TenantId(UUIDs.timeBased()); |
|||
TenantId tenantIdB = new TenantId(UUIDs.timeBased()); |
|||
|
|||
PluginMetaData pluginA = generatePlugin(tenantIdA, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
PluginMetaData pluginB = generatePlugin(tenantIdB, "testPluginToken" + ThreadLocalRandom.current().nextInt()); |
|||
pluginService.savePlugin(pluginA); |
|||
pluginService.savePlugin(pluginB); |
|||
|
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdA, null, pluginA.getApiToken())); |
|||
|
|||
ruleService.saveRule(generateRule(tenantIdB, null, pluginB.getApiToken())); |
|||
ruleService.saveRule(generateRule(tenantIdB, null, pluginB.getApiToken())); |
|||
|
|||
TextPageData<RuleMetaData> foundA = ruleService.findTenantRules(tenantIdA, new TextPageLink(100)); |
|||
Assert.assertEquals(3, foundA.getData().size()); |
|||
|
|||
TextPageData<RuleMetaData> foundB = ruleService.findTenantRules(tenantIdB, new TextPageLink(100)); |
|||
Assert.assertEquals(2, foundB.getData().size()); |
|||
|
|||
ruleService.deleteRulesByTenantId(tenantIdA); |
|||
|
|||
foundA = ruleService.findTenantRules(tenantIdA, new TextPageLink(100)); |
|||
Assert.assertEquals(0, foundA.getData().size()); |
|||
|
|||
foundB = ruleService.findTenantRules(tenantIdB, new TextPageLink(100)); |
|||
Assert.assertEquals(2, foundB.getData().size()); |
|||
} |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
import org.thingsboard.server.common.data.plugin.ComponentScope; |
|||
import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration; |
|||
|
|||
import java.lang.annotation.ElementType; |
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
import java.lang.annotation.Target; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@Target(ElementType.TYPE) |
|||
public @interface ActionNode { |
|||
|
|||
String name(); |
|||
|
|||
ComponentScope scope() default ComponentScope.TENANT; |
|||
|
|||
String descriptor() default "EmptyNodeDescriptor.json"; |
|||
|
|||
String[] relationTypes() default {"Success","Failure"}; |
|||
|
|||
boolean customRelations() default false; |
|||
|
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
import org.thingsboard.server.common.data.plugin.ComponentScope; |
|||
import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration; |
|||
|
|||
import java.lang.annotation.ElementType; |
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
import java.lang.annotation.Target; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@Target(ElementType.TYPE) |
|||
public @interface EnrichmentNode { |
|||
|
|||
String name(); |
|||
|
|||
ComponentScope scope() default ComponentScope.TENANT; |
|||
|
|||
String descriptor() default "EmptyNodeDescriptor.json"; |
|||
|
|||
String[] relationTypes() default {"Success","Failure"}; |
|||
|
|||
boolean customRelations() default false; |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
import org.thingsboard.server.common.data.plugin.ComponentScope; |
|||
import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration; |
|||
|
|||
import java.lang.annotation.ElementType; |
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
import java.lang.annotation.Target; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@Target(ElementType.TYPE) |
|||
public @interface FilterNode { |
|||
|
|||
String name(); |
|||
|
|||
ComponentScope scope() default ComponentScope.TENANT; |
|||
|
|||
String descriptor() default "EmptyNodeDescriptor.json"; |
|||
|
|||
String[] relationTypes() default {"Success","Failure"}; |
|||
|
|||
boolean customRelations() default false; |
|||
|
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
import org.thingsboard.server.common.data.plugin.ComponentScope; |
|||
import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration; |
|||
|
|||
import java.lang.annotation.ElementType; |
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
import java.lang.annotation.Target; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@Target(ElementType.TYPE) |
|||
public @interface TransformationNode { |
|||
|
|||
String name(); |
|||
|
|||
ComponentScope scope() default ComponentScope.TENANT; |
|||
|
|||
String descriptor() default "EmptyNodeDescriptor.json"; |
|||
|
|||
String[] relationTypes() default {"Success","Failure"}; |
|||
|
|||
boolean customRelations() default false; |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue