From ffc88eeb2d0fbfefa6d5dd56f47585c3249e5a10 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 18 Feb 2025 15:28:46 +0200 Subject: [PATCH] CF Kafka states cleanup --- .../AbstractCalculatedFieldStateService.java | 6 ++- .../KafkaCalculatedFieldStateService.java | 38 +++++++++++++++++-- .../queue/common/AbstractTbQueueTemplate.java | 13 ++++--- .../server/queue/common/TbProtoQueueMsg.java | 3 +- .../provider/KafkaMonolithQueueFactory.java | 2 +- .../KafkaTbRuleEngineQueueFactory.java | 2 +- 6 files changed, 50 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index e390d9e55c..c55ec00379 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -52,9 +52,11 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF protected void processRestoredState(CalculatedFieldStateProto stateMsg) { var id = fromProto(stateMsg.getId()); var state = fromProto(stateMsg); - actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state)); + processRestoredState(id, state); } - + protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state) { + actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state)); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index cce7e5ef22..959522ca63 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -24,10 +24,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -46,6 +51,11 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToUuid; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.stringToBytes; +import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.uuidToBytes; + @Service @RequiredArgsConstructor @Slf4j @@ -81,7 +91,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg msg : msgs) { try { - processRestoredState(msg.getValue()); + if (msg.getValue() != null) { + processRestoredState(msg.getValue()); + } else { + processRestoredState(getStateId(msg.getHeaders()), null); + } } catch (Throwable t) { log.error("Failed to process state message: {}", msg, t); } @@ -103,7 +117,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId()); - stateProducer.send(tpi, stateId.toKey(), new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto), new TbQueueCallback() { + TbProtoQueueMsg msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); + if (stateMsgProto == null) { + putStateId(msg.getHeaders(), stateId); + } + stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { if (callback != null) { @@ -122,7 +140,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) { - //TODO: vklimov + doPersist(stateId, null, callback); } @Override @@ -138,6 +156,20 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta log.info("Restored {} calculated field states in {} ms", counter.get(), System.currentTimeMillis() - startTs); } + private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) { + headers.put("tenantId", uuidToBytes(stateId.tenantId().getId())); + headers.put("cfId", uuidToBytes(stateId.cfId().getId())); + headers.put("entityId", uuidToBytes(stateId.entityId().getId())); + headers.put("entityType", stringToBytes(stateId.entityId().getEntityType().name())); + } + + private CalculatedFieldEntityCtxId getStateId(TbQueueMsgHeaders headers) { + TenantId tenantId = TenantId.fromUUID(bytesToUuid(headers.get("tenantId"))); + CalculatedFieldId cfId = new CalculatedFieldId(bytesToUuid(headers.get("cfId"))); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(bytesToString(headers.get("entityType")), bytesToUuid(headers.get("entityId"))); + return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId); + } + @PreDestroy private void preDestroy() { stateConsumer.stop(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java index f11eaaef48..51ed1bb05f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java @@ -24,35 +24,36 @@ public class AbstractTbQueueTemplate { protected static final String RESPONSE_TOPIC_HEADER = "responseTopic"; protected static final String EXPIRE_TS_HEADER = "expireTs"; - protected byte[] uuidToBytes(UUID uuid) { + public static byte[] uuidToBytes(UUID uuid) { ByteBuffer buf = ByteBuffer.allocate(16); buf.putLong(uuid.getMostSignificantBits()); buf.putLong(uuid.getLeastSignificantBits()); return buf.array(); } - protected static UUID bytesToUuid(byte[] bytes) { + public static UUID bytesToUuid(byte[] bytes) { ByteBuffer bb = ByteBuffer.wrap(bytes); long firstLong = bb.getLong(); long secondLong = bb.getLong(); return new UUID(firstLong, secondLong); } - protected byte[] stringToBytes(String string) { + public static byte[] stringToBytes(String string) { return string.getBytes(StandardCharsets.UTF_8); } - protected String bytesToString(byte[] data) { + public static String bytesToString(byte[] data) { return new String(data, StandardCharsets.UTF_8); } - protected static byte[] longToBytes(long x) { + public static byte[] longToBytes(long x) { ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES); longBuffer.putLong(0, x); return longBuffer.array(); } - protected static long bytesToLong(byte[] bytes) { + public static long bytesToLong(byte[] bytes) { return ByteBuffer.wrap(bytes).getLong(); } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbProtoQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbProtoQueueMsg.java index fb7d086daa..6bb72a65fa 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/TbProtoQueueMsg.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/TbProtoQueueMsg.java @@ -50,6 +50,7 @@ public class TbProtoQueueMsg i @Override public byte[] getData() { - return value.toByteArray(); + return value != null ? value.toByteArray() : null; } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 91ba63427b..d768648aea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -556,7 +556,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi .stopWhenRead(true) .clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()) .groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders())) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), msg.getData() != null ? CalculatedFieldStateProto.parseFrom(msg.getData()) : null, msg.getHeaders())) .admin(cfStateAdmin) .statsService(consumerStatsService) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index d4076ba67d..45a2290f52 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -348,7 +348,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { .stopWhenRead(true) .clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()) .groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders())) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), msg.getData() != null ? CalculatedFieldStateProto.parseFrom(msg.getData()) : null, msg.getHeaders())) .admin(cfStateAdmin) .statsService(consumerStatsService) .build();