Browse Source

fixed checkpoint node

pull/6534/head
YevhenBondarenko 4 years ago
parent
commit
abe127c3fa
  1. 1
      application/src/main/java/org/thingsboard/server/controller/QueueController.java
  2. 10
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java
  3. 2
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java

1
application/src/main/java/org/thingsboard/server/controller/QueueController.java

@ -107,6 +107,7 @@ public class QueueController extends BaseController {
checkParameter("queueId", queueIdStr);
try {
QueueId queueId = new QueueId(UUID.fromString(queueIdStr));
checkQueueId(queueId, Operation.READ);
return checkNotNull(queueService.findQueueById(getTenantId(), queueId));
} catch (
Exception e) {

10
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java

@ -23,9 +23,12 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.UUID;
@Slf4j
@RuleNode(
type = ComponentType.FLOW,
@ -38,16 +41,17 @@ import org.thingsboard.server.common.msg.TbMsg;
)
public class TbCheckpointNode implements TbNode {
private TbCheckpointNodeConfiguration config;
private QueueId queueId;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class);
TbCheckpointNodeConfiguration config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class);
this.queueId = new QueueId(UUID.fromString(config.getQueueId()));
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ctx.enqueueForTellNext(msg, config.getQueueId(), TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
ctx.enqueueForTellNext(msg, queueId, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
}
@Override

2
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNodeConfiguration.java

@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.QueueId;
@Data
public class TbCheckpointNodeConfiguration implements NodeConfiguration<TbCheckpointNodeConfiguration> {
private QueueId queueId;
private String queueId;
@Override
public TbCheckpointNodeConfiguration defaultConfiguration() {

Loading…
Cancel
Save