|
|
|
@ -106,7 +106,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta |
|
|
|
|
|
|
|
@Override |
|
|
|
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { |
|
|
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); |
|
|
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); |
|
|
|
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); |
|
|
|
if (stateMsgProto == null) { |
|
|
|
putStateId(msg.getHeaders(), stateId); |
|
|
|
|