|
|
|
@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule; |
|
|
|
import java.util.*; |
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException; |
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
import org.thingsboard.server.actors.ActorSystemContext; |
|
|
|
import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper; |
|
|
|
import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
|
|
|
@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
|
|
} |
|
|
|
|
|
|
|
private void initAction() throws Exception { |
|
|
|
JsonNode actionMd = ruleMd.getAction(); |
|
|
|
action = initComponent(actionMd); |
|
|
|
if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) { |
|
|
|
action = initComponent(ruleMd.getAction()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void initProcessor() throws Exception { |
|
|
|
@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
|
|
} |
|
|
|
|
|
|
|
private void fetchPluginInfo() { |
|
|
|
PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); |
|
|
|
pluginTenantId = pluginMd.getTenantId(); |
|
|
|
pluginId = pluginMd.getId(); |
|
|
|
if (!StringUtils.isEmpty(ruleMd.getPluginToken())) { |
|
|
|
PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken()); |
|
|
|
pluginTenantId = pluginMd.getTenantId(); |
|
|
|
pluginId = pluginMd.getId(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException { |
|
|
|
@ -162,25 +166,27 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
|
|
inMsgMd = new RuleProcessingMetaData(); |
|
|
|
} |
|
|
|
logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg); |
|
|
|
Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); |
|
|
|
if (ruleToPluginMsgOptional.isPresent()) { |
|
|
|
RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get(); |
|
|
|
logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); |
|
|
|
context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); |
|
|
|
if (action.isOneWayAction()) { |
|
|
|
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); |
|
|
|
} else { |
|
|
|
pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); |
|
|
|
scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); |
|
|
|
if (action != null) { |
|
|
|
Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd); |
|
|
|
if (ruleToPluginMsgOptional.isPresent()) { |
|
|
|
RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get(); |
|
|
|
logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg); |
|
|
|
context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self()); |
|
|
|
if (action.isOneWayAction()) { |
|
|
|
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); |
|
|
|
return; |
|
|
|
} else { |
|
|
|
pendingMsgMap.put(ruleToPluginMsg.getUid(), msg); |
|
|
|
scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout()); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); |
|
|
|
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS); |
|
|
|
return; |
|
|
|
} |
|
|
|
logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId); |
|
|
|
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS); |
|
|
|
} |
|
|
|
|
|
|
|
public void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) { |
|
|
|
void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) { |
|
|
|
RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid()); |
|
|
|
if (pendingMsg != null) { |
|
|
|
ChainProcessingContext ctx = pendingMsg.getCtx(); |
|
|
|
@ -196,7 +202,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { |
|
|
|
void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) { |
|
|
|
RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId()); |
|
|
|
if (pendingMsg != null) { |
|
|
|
logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg); |
|
|
|
@ -210,13 +216,13 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
|
|
ctx = ctx.withError(error); |
|
|
|
} |
|
|
|
if (ctx.isFailure()) { |
|
|
|
logger.debug("[{}] Forwarding processing chain to device actor due to failure.", ctx.getInMsg().getDeviceId()); |
|
|
|
logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); |
|
|
|
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); |
|
|
|
} else if (!ctx.hasNext()) { |
|
|
|
logger.debug("[{}] Forwarding processing chain to device actor due to end of chain.", ctx.getInMsg().getDeviceId()); |
|
|
|
logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); |
|
|
|
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); |
|
|
|
} else { |
|
|
|
logger.debug("[{}] Forwarding processing chain to next rule actor.", ctx.getInMsg().getDeviceId()); |
|
|
|
logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); |
|
|
|
ChainProcessingContext nextTask = ctx.getNext(); |
|
|
|
nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self()); |
|
|
|
} |
|
|
|
@ -269,18 +275,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
|
|
public void onActivate(ActorContext context) throws Exception { |
|
|
|
logger.info("[{}] Going to process onActivate rule.", entityId); |
|
|
|
this.state = ComponentLifecycleState.ACTIVE; |
|
|
|
if (action != null) { |
|
|
|
if (filters != null) { |
|
|
|
filters.forEach(f -> f.resume()); |
|
|
|
} else { |
|
|
|
initFilters(); |
|
|
|
} |
|
|
|
if (filters != null) { |
|
|
|
filters.forEach(RuleLifecycleComponent::resume); |
|
|
|
if (processor != null) { |
|
|
|
processor.resume(); |
|
|
|
} else { |
|
|
|
initProcessor(); |
|
|
|
} |
|
|
|
action.resume(); |
|
|
|
if (action != null) { |
|
|
|
action.resume(); |
|
|
|
} |
|
|
|
logger.info("[{}] Rule resumed.", entityId); |
|
|
|
} else { |
|
|
|
start(); |
|
|
|
|