|
|
|
@ -169,7 +169,9 @@ class DefaultTbContext implements TbContext { |
|
|
|
private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer<Throwable> onFailure, Runnable onSuccess) { |
|
|
|
if (!tbMsg.isValid()) { |
|
|
|
log.trace("[{}] Skip invalid message: {}", getTenantId(), tbMsg); |
|
|
|
onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); |
|
|
|
if (onFailure != null) { |
|
|
|
onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); |
|
|
|
} |
|
|
|
return; |
|
|
|
} |
|
|
|
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() |
|
|
|
@ -242,7 +244,9 @@ class DefaultTbContext implements TbContext { |
|
|
|
private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) { |
|
|
|
if (!source.isValid()) { |
|
|
|
log.trace("[{}] Skip invalid message: {}", getTenantId(), source); |
|
|
|
onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); |
|
|
|
if (onFailure != null) { |
|
|
|
onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); |
|
|
|
} |
|
|
|
return; |
|
|
|
} |
|
|
|
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); |
|
|
|
|