|
|
|
@ -137,7 +137,7 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
} |
|
|
|
lastScheduledTs = lastScheduledTs + delay; |
|
|
|
long curDelay = Math.max(0L, (lastScheduledTs - curTs)); |
|
|
|
TbMsg tickMsg = ctx.newMsg(null, TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); |
|
|
|
TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); |
|
|
|
nextTickId = tickMsg.getId(); |
|
|
|
ctx.tellSelf(tickMsg, curDelay); |
|
|
|
} |
|
|
|
@ -145,14 +145,14 @@ public class TbMsgGeneratorNode implements TbNode { |
|
|
|
private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) { |
|
|
|
log.trace("generate, config {}", config); |
|
|
|
if (prevMsg == null) { |
|
|
|
prevMsg = ctx.newMsg(null, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); |
|
|
|
prevMsg = ctx.newMsg(config.getQueueName(), "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); |
|
|
|
} |
|
|
|
if (initialized.get()) { |
|
|
|
ctx.logJsEvalRequest(); |
|
|
|
return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> { |
|
|
|
log.trace("generate process response, generated {}, config {}", generated, config); |
|
|
|
ctx.logJsEvalResponse(); |
|
|
|
prevMsg = ctx.newMsg(null, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); |
|
|
|
prevMsg = ctx.newMsg(config.getQueueName(), generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); |
|
|
|
return Futures.immediateFuture(prevMsg); |
|
|
|
}, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool
|
|
|
|
} |
|
|
|
|