Browse Source

CF Kafka states cleanup

pull/12713/head
ViacheslavKlimov 1 year ago
parent
commit
ffc88eeb2d
  1. 6
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java
  2. 38
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java
  3. 13
      common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java
  4. 3
      common/queue/src/main/java/org/thingsboard/server/queue/common/TbProtoQueueMsg.java
  5. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  6. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

6
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java

@ -52,9 +52,11 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
protected void processRestoredState(CalculatedFieldStateProto stateMsg) {
var id = fromProto(stateMsg.getId());
var state = fromProto(stateMsg);
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state));
processRestoredState(id, state);
}
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state) {
actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(id, state));
}
}

38
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java

@ -24,10 +24,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
@ -46,6 +51,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToUuid;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.stringToBytes;
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.uuidToBytes;
@Service
@RequiredArgsConstructor
@Slf4j
@ -81,7 +91,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
.msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
try {
processRestoredState(msg.getValue());
if (msg.getValue() != null) {
processRestoredState(msg.getValue());
} else {
processRestoredState(getStateId(msg.getHeaders()), null);
}
} catch (Throwable t) {
log.error("Failed to process state message: {}", msg, t);
}
@ -103,7 +117,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId());
stateProducer.send(tpi, stateId.toKey(), new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto), new TbQueueCallback() {
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto);
if (stateMsgProto == null) {
putStateId(msg.getHeaders(), stateId);
}
stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (callback != null) {
@ -122,7 +140,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override
protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) {
//TODO: vklimov
doPersist(stateId, null, callback);
}
@Override
@ -138,6 +156,20 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
log.info("Restored {} calculated field states in {} ms", counter.get(), System.currentTimeMillis() - startTs);
}
private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) {
headers.put("tenantId", uuidToBytes(stateId.tenantId().getId()));
headers.put("cfId", uuidToBytes(stateId.cfId().getId()));
headers.put("entityId", uuidToBytes(stateId.entityId().getId()));
headers.put("entityType", stringToBytes(stateId.entityId().getEntityType().name()));
}
private CalculatedFieldEntityCtxId getStateId(TbQueueMsgHeaders headers) {
TenantId tenantId = TenantId.fromUUID(bytesToUuid(headers.get("tenantId")));
CalculatedFieldId cfId = new CalculatedFieldId(bytesToUuid(headers.get("cfId")));
EntityId entityId = EntityIdFactory.getByTypeAndUuid(bytesToString(headers.get("entityType")), bytesToUuid(headers.get("entityId")));
return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId);
}
@PreDestroy
private void preDestroy() {
stateConsumer.stop();

13
common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueTemplate.java

@ -24,35 +24,36 @@ public class AbstractTbQueueTemplate {
protected static final String RESPONSE_TOPIC_HEADER = "responseTopic";
protected static final String EXPIRE_TS_HEADER = "expireTs";
protected byte[] uuidToBytes(UUID uuid) {
public static byte[] uuidToBytes(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(16);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
protected static UUID bytesToUuid(byte[] bytes) {
public static UUID bytesToUuid(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
long firstLong = bb.getLong();
long secondLong = bb.getLong();
return new UUID(firstLong, secondLong);
}
protected byte[] stringToBytes(String string) {
public static byte[] stringToBytes(String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
protected String bytesToString(byte[] data) {
public static String bytesToString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
protected static byte[] longToBytes(long x) {
public static byte[] longToBytes(long x) {
ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES);
longBuffer.putLong(0, x);
return longBuffer.array();
}
protected static long bytesToLong(byte[] bytes) {
public static long bytesToLong(byte[] bytes) {
return ByteBuffer.wrap(bytes).getLong();
}
}

3
common/queue/src/main/java/org/thingsboard/server/queue/common/TbProtoQueueMsg.java

@ -50,6 +50,7 @@ public class TbProtoQueueMsg<T extends com.google.protobuf.GeneratedMessageV3> i
@Override
public byte[] getData() {
return value.toByteArray();
return value != null ? value.toByteArray() : null;
}
}

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -556,7 +556,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
.stopWhenRead(true)
.clientId("monolith-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet())
.groupId(topicService.buildTopicName("monolith-calculated-field-state-consumer"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), msg.getData() != null ? CalculatedFieldStateProto.parseFrom(msg.getData()) : null, msg.getHeaders()))
.admin(cfStateAdmin)
.statsService(consumerStatsService)
.build();

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -348,7 +348,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
.stopWhenRead(true)
.clientId("tb-rule-engine-calculated-field-state-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet())
.groupId(topicService.buildTopicName("tb-rule-engine-calculated-field-state-consumer"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), CalculatedFieldStateProto.parseFrom(msg.getData()), msg.getHeaders()))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), msg.getData() != null ? CalculatedFieldStateProto.parseFrom(msg.getData()) : null, msg.getHeaders()))
.admin(cfStateAdmin)
.statsService(consumerStatsService)
.build();

Loading…
Cancel
Save