|
|
|
@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|
|
|
import org.thingsboard.rule.engine.api.TbNodeException; |
|
|
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|
|
|
import org.thingsboard.server.common.data.StringUtils; |
|
|
|
import org.thingsboard.server.common.data.id.CustomerId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|
|
|
import org.thingsboard.server.common.data.msg.TbMsgType; |
|
|
|
@ -96,7 +97,7 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
if (initialized.compareAndSet(false, true)) { |
|
|
|
this.scriptEngine = ctx.createScriptEngine(config.getScriptLang(), |
|
|
|
ScriptLanguage.TBEL.equals(config.getScriptLang()) ? config.getTbelScript() : config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType"); |
|
|
|
scheduleTickMsg(ctx); |
|
|
|
scheduleTickMsg(ctx, null); |
|
|
|
} |
|
|
|
} else if (initialized.compareAndSet(true, false)) { |
|
|
|
destroy(); |
|
|
|
@ -113,7 +114,7 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
log.trace("onMsg onSuccess callback, took {}ms, config {}, msg {}", sw.stopAndGetTotalTimeMillis(), config, msg); |
|
|
|
if (initialized.get() && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { |
|
|
|
ctx.enqueueForTellNext(m, TbNodeConnectionType.SUCCESS); |
|
|
|
scheduleTickMsg(ctx); |
|
|
|
scheduleTickMsg(ctx, msg); |
|
|
|
currentMsgCount++; |
|
|
|
} |
|
|
|
}, |
|
|
|
@ -121,14 +122,14 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
log.trace("onMsg onFailure callback, took {}ms, config {}, msg {}", sw.stopAndGetTotalTimeMillis(), config, msg, t); |
|
|
|
if (initialized.get() && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { |
|
|
|
ctx.tellFailure(msg, t); |
|
|
|
scheduleTickMsg(ctx); |
|
|
|
scheduleTickMsg(ctx, msg); |
|
|
|
currentMsgCount++; |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void scheduleTickMsg(TbContext ctx) { |
|
|
|
private void scheduleTickMsg(TbContext ctx, TbMsg msg) { |
|
|
|
log.trace("scheduleTickMsg, config {}", config); |
|
|
|
long curTs = System.currentTimeMillis(); |
|
|
|
if (lastScheduledTs == 0L) { |
|
|
|
@ -136,7 +137,8 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
} |
|
|
|
lastScheduledTs = lastScheduledTs + delay; |
|
|
|
long curDelay = Math.max(0L, (lastScheduledTs - curTs)); |
|
|
|
TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); |
|
|
|
TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), |
|
|
|
getCustomerIdFromMsg(msg), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); |
|
|
|
nextTickId = tickMsg.getId(); |
|
|
|
ctx.tellSelf(tickMsg, curDelay); |
|
|
|
} |
|
|
|
@ -159,6 +161,10 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
private CustomerId getCustomerIdFromMsg(TbMsg msg) { |
|
|
|
return msg != null ? msg.getCustomerId() : null; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void destroy() { |
|
|
|
log.trace("destroy, config {}", config); |
|
|
|
|