Browse Source

ThingsBoard Rule Nodes improvements

pull/1322/head
Andrew Shvayka 8 years ago
parent
commit
1bf196bfdf
  1. 19
      application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
  2. 14
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationBeginNode.java
  3. 8
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationEndNode.java

19
application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java

@ -182,12 +182,14 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
while (true) { while (true) {
TbTransactionTask transactionTask = timeoutQueue.peek(); TbTransactionTask transactionTask = timeoutQueue.peek();
if (transactionTask != null) { if (transactionTask != null) {
long sleepDuration = 0L;
transactionLock.lock(); transactionLock.lock();
try { try {
if (transactionTask.isCompleted()) { if (transactionTask.isCompleted()) {
timeoutQueue.poll(); timeoutQueue.poll();
} else { } else {
if (System.currentTimeMillis() > transactionTask.getExpirationTime()) { long expIn = transactionTask.getExpirationTime() - System.currentTimeMillis();
if (expIn < 0) {
log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
timeoutQueue.poll(); timeoutQueue.poll();
executeOnFailure(transactionTask.getOnFailure(), "Task has expired!"); executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
@ -201,17 +203,20 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
} }
} }
} else { } else {
try { sleepDuration = Math.min(expIn, duration);
log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
TimeUnit.MILLISECONDS.sleep(duration);
} catch (InterruptedException e) {
throw new IllegalStateException("Thread interrupted", e);
}
} }
} }
} finally { } finally {
transactionLock.unlock(); transactionLock.unlock();
} }
if (sleepDuration > 0L) {
try {
log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
TimeUnit.MILLISECONDS.sleep(sleepDuration);
} catch (InterruptedException e) {
throw new IllegalStateException("Thread interrupted", e);
}
}
} else { } else {
try { try {
log.trace("Queue is empty, waiting for tasks!"); log.trace("Queue is empty, waiting for tasks!");

14
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationBeginNode.java

@ -35,13 +35,15 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j @Slf4j
@RuleNode( @RuleNode(
type = ComponentType.ACTION, type = ComponentType.ACTION,
name = "transaction start", name = "synchronization start",
configClazz = EmptyNodeConfiguration.class, configClazz = EmptyNodeConfiguration.class,
nodeDescription = "", nodeDescription = "Starts synchronization of message processing based on message originator",
nodeDetails = "", nodeDetails = "This node should be used together with \"synchronization end\" node. \n This node will put messages into queue based on message originator id. \n" +
"Subsequent messages will not be processed until the previous message processing is completed or timeout event occurs.\n" +
"Size of the queue per originator and timeout values are configurable on a system level",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig") configDirective = "tbNodeEmptyConfig")
public class TbTransactionBeginNode implements TbNode { public class TbSynchronizationBeginNode implements TbNode {
private EmptyNodeConfiguration config; private EmptyNodeConfiguration config;
@ -51,7 +53,7 @@ public class TbTransactionBeginNode implements TbNode {
} }
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { public void onMsg(TbContext ctx, TbMsg msg) {
log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType()); log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType());
TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator()); TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator());
@ -63,7 +65,7 @@ public class TbTransactionBeginNode implements TbNode {
ctx.tellNext(startMsg, SUCCESS); ctx.tellNext(startMsg, SUCCESS);
}, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()), }, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()),
throwable -> { throwable -> {
log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable); log.trace("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable);
ctx.tellFailure(tbMsg, throwable); ctx.tellFailure(tbMsg, throwable);
}); });
} }

8
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java → rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationEndNode.java

@ -33,14 +33,14 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j @Slf4j
@RuleNode( @RuleNode(
type = ComponentType.ACTION, type = ComponentType.ACTION,
name = "transaction end", name = "synchronization end",
configClazz = EmptyNodeConfiguration.class, configClazz = EmptyNodeConfiguration.class,
nodeDescription = "", nodeDescription = "Stops synchronization of message processing based on message originator",
nodeDetails = "", nodeDetails = "",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = ("tbNodeEmptyConfig") configDirective = ("tbNodeEmptyConfig")
) )
public class TbTransactionEndNode implements TbNode { public class TbSynchronizationEndNode implements TbNode {
private EmptyNodeConfiguration config; private EmptyNodeConfiguration config;
@ -50,7 +50,7 @@ public class TbTransactionEndNode implements TbNode {
} }
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { public void onMsg(TbContext ctx, TbMsg msg) {
ctx.getRuleChainTransactionService().endTransaction(msg, ctx.getRuleChainTransactionService().endTransaction(msg,
successMsg -> ctx.tellNext(successMsg, SUCCESS), successMsg -> ctx.tellNext(successMsg, SUCCESS),
throwable -> ctx.tellFailure(msg, throwable)); throwable -> ctx.tellFailure(msg, throwable));
Loading…
Cancel
Save