From 2dfbe2240d4bd22904e3d5edd6a7edeb15b0d1a5 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 24 Dec 2024 16:25:01 +0200 Subject: [PATCH] added implementation to handle profile events when not my partition --- ...efaultCalculatedFieldExecutionService.java | 510 +++++++++--------- .../server/common/util/ProtoUtils.java | 40 ++ common/proto/src/main/proto/queue.proto | 14 +- 3 files changed, 306 insertions(+), 258 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index ff78d2925a..6e7536825a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -84,7 +84,6 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -98,6 +97,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.util.ProtoUtils.fromObjectProto; +import static org.thingsboard.server.common.util.ProtoUtils.toObjectProto; @TbCoreComponent @Service @@ -229,6 +230,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); log.info("Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); + if (!tpi.isMyPartition()) { + clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldMsg(proto).build(), null); + log.debug("[{}][{}] Calculated field belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, calculatedFieldId, tpi.getFullTopicName()); + callback.onFailure(new RuntimeException("Calculated field belongs to external partition " + tpi.getFullTopicName() + "!")); + } if (proto.getDeleted()) { log.warn("Executing onCalculatedFieldDelete, calculatedFieldId=[{}]", calculatedFieldId); onCalculatedFieldDelete(tenantId, calculatedFieldId, callback); @@ -277,6 +284,54 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { + CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); + boolean shouldReinit = true; + if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) { + onCalculatedFieldDelete(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId(), callback); + } else { + callback.onSuccess(); + shouldReinit = false; + } + return shouldReinit; + } + + private void onCalculatedFieldDelete(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbCallback callback) { + try { + cleanupEntity(calculatedFieldId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); + Set calculatedFieldIds = partitionedEntities.get(tpi); + if (calculatedFieldIds != null) { + calculatedFieldIds.remove(calculatedFieldId); + } + calculatedFieldCache.evict(calculatedFieldId); + states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); + List statesToRemove = states.keySet().stream() + .filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())) + .map(JacksonUtil::writeValueAsString) + .toList(); + rocksDBService.deleteAll(statesToRemove); + } catch (Exception e) { + log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e); + callback.onFailure(e); + } + } + + private boolean hasSignificantChanges(CalculatedField oldCalculatedField, CalculatedField newCalculatedField) { + if (oldCalculatedField == null) { + return true; + } + boolean entityIdChanged = !oldCalculatedField.getEntityId().equals(newCalculatedField.getEntityId()); + boolean typeChanged = !oldCalculatedField.getType().equals(newCalculatedField.getType()); + CalculatedFieldConfiguration oldConfig = oldCalculatedField.getConfiguration(); + CalculatedFieldConfiguration newConfig = newCalculatedField.getConfiguration(); + boolean argumentsChanged = !oldConfig.getArguments().equals(newConfig.getArguments()); + boolean outputTypeChanged = !oldConfig.getOutput().getType().equals(newConfig.getOutput().getType()); + boolean expressionChanged = !oldConfig.getExpression().equals(newConfig.getExpression()); + + return entityIdChanged || typeChanged || argumentsChanged || outputTypeChanged || expressionChanged; + } + @Override public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List calculatedFieldIds, List telemetry) { try { @@ -288,8 +343,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } else if (EntityType.DEVICE.equals(entityType)) { profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); } - List cfLinks = calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId); - Optional.ofNullable(profileId).ifPresent(id -> calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, id)); + List cfLinks = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId)); + Optional.ofNullable(profileId).ifPresent(id -> { + cfLinks.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, id)); + }); cfLinks.forEach(link -> { CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); Map attributes = link.getConfiguration().getAttributes(); @@ -312,6 +369,23 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private String getMappedKey(KvEntry entry, Map attributes, Map timeSeries) { + if (entry instanceof AttributeKvEntry) { + return attributes.entrySet().stream() + .filter(attr -> attr.getValue().equals(entry.getKey())) + .map(Map.Entry::getKey) + .findFirst() + .orElse(entry.getKey()); + } else if (entry instanceof TsKvEntry) { + return timeSeries.entrySet().stream() + .filter(ts -> ts.getValue().equals(entry.getKey())) + .map(Map.Entry::getKey) + .findFirst() + .orElse(entry.getKey()); + } + return entry.getKey(); + } + private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List calculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId); CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId); @@ -334,75 +408,23 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId); } - private String getMappedKey(KvEntry entry, Map attributes, Map timeSeries) { - if (entry instanceof AttributeKvEntry) { - return attributes.entrySet().stream() - .filter(attr -> attr.getValue().equals(entry.getKey())) - .map(Map.Entry::getKey) - .findFirst() - .orElse(entry.getKey()); - } else if (entry instanceof TsKvEntry) { - return timeSeries.entrySet().stream() - .filter(ts -> ts.getValue().equals(entry.getKey())) - .map(Map.Entry::getKey) - .findFirst() - .orElse(entry.getKey()); - } - return entry.getKey(); - } - - private Object deserializeObjectProto(TransportProtos.ObjectProto objectProto) { - try { - String type = objectProto.getType(); - String value = objectProto.getValue(); - return switch (type) { - case "java.lang.String" -> value; - case "java.lang.Integer" -> Integer.parseInt(value); - case "java.lang.Long" -> Long.parseLong(value); - case "java.lang.Double" -> Double.parseDouble(value); - case "java.lang.Boolean" -> Boolean.parseBoolean(value); - default -> throw new IllegalArgumentException("Unsupported object type: " + type); - }; - } catch (Exception e) { - log.error("Failed to deserialize ObjectProto: [{}]", objectProto, e); - return null; - } - } - @Override public void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback) { try { TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - List calculatedFieldIds = new ArrayList<>(); - for (TransportProtos.CalculatedFieldIdProto cfIdProto : proto.getCalculatedFieldsList()) { - CalculatedFieldId cfId = new CalculatedFieldId(new UUID( - cfIdProto.getCalculatedFieldIdMSB(), - cfIdProto.getCalculatedFieldIdLSB() - )); - calculatedFieldIds.add(cfId); + + if (proto.getClear()) { + clearState(tenantId, calculatedFieldId, entityId); + return; } - Map argumentsMap = new HashMap<>(); - proto.getArgumentsMap().forEach((key, entryProto) -> { - ArgumentEntry argumentEntry; - if (entryProto.hasTsRecords()) { - TsRollingArgumentEntry tsRollingArgumentEntry = new TsRollingArgumentEntry(); - entryProto.getTsRecords().getTsRecordsMap().forEach((ts, objectProto) -> { - Object value = deserializeObjectProto(objectProto); - tsRollingArgumentEntry.getTsRecords().put(ts, value); - }); - argumentEntry = tsRollingArgumentEntry; - } else if (entryProto.hasSingleValue()) { - TransportProtos.SingleValueProto singleRecordProto = entryProto.getSingleValue(); - Object value = deserializeObjectProto(singleRecordProto.getValue()); - argumentEntry = new SingleValueArgumentEntry(singleRecordProto.getTs(), value); - } else { - throw new IllegalArgumentException("Unsupported ArgumentEntryProto type"); - } - argumentsMap.put(key, argumentEntry); - }); + List calculatedFieldIds = proto.getCalculatedFieldsList().stream() + .map(cfIdProto -> new CalculatedFieldId(new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) + .toList(); + Map argumentsMap = proto.getArgumentsMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue()))); CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, calculatedFieldIds); @@ -424,16 +446,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId); calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) - .forEach(cfId -> { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); - if (tpi.isMyPartition()) { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - states.remove(ctxId); - rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); - } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, Collections.emptyList(), null); - } - }); + .forEach(cfId -> clearState(tenantId, cfId, entityId)); initializeStateForEntityByProfile(tenantId, entityId, newProfileId, callback); } catch (Exception e) { @@ -455,16 +468,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream().map(CalculatedFieldLink::getCalculatedFieldId), calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream().map(CalculatedFieldLink::getCalculatedFieldId) ).toList(); - calculatedFieldIds.forEach(cfId -> { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); - if (tpi.isMyPartition()) { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - states.remove(ctxId); - rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); - } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, Collections.emptyList(), null); - } - }); + calculatedFieldIds.forEach(cfId -> clearState(tenantId, cfId, entityId)); } else { log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId); @@ -475,94 +479,16 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, List calculatedFieldIds, Map argumentValues) { - TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = TransportProtos.CalculatedFieldStateMsgProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits()) - .setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits()) - .setEntityType(entityId.getEntityType().name()) - .setEntityIdMSB(entityId.getId().getMostSignificantBits()) - .setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - - if (argumentValues != null) { - argumentValues.forEach((key, argumentEntry) -> { - TransportProtos.ArgumentEntryProto.Builder argumentEntryProtoBuilder = TransportProtos.ArgumentEntryProto.newBuilder(); - - if (argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) { - TransportProtos.TsRollingProto.Builder tsRollingProtoBuilder = TransportProtos.TsRollingProto.newBuilder(); - - tsRollingArgumentEntry.getTsRecords().forEach((ts, value) -> { - TransportProtos.ObjectProto.Builder objectProtoBuilder = TransportProtos.ObjectProto.newBuilder() - .setType(value.getClass().getName()) - .setValue(value.toString()); - tsRollingProtoBuilder.putTsRecords(ts, objectProtoBuilder.build()); - }); - - argumentEntryProtoBuilder.setTsRecords(tsRollingProtoBuilder.build()); - } else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - TransportProtos.SingleValueProto.Builder singleRecordProtoBuilder = TransportProtos.SingleValueProto.newBuilder() - .setTs(singleValueArgumentEntry.getTs()) - .setValue(TransportProtos.ObjectProto.newBuilder() - .setType(singleValueArgumentEntry.getValue().getClass().getName()) - .setValue(singleValueArgumentEntry.getValue().toString()) - .build()); - argumentEntryProtoBuilder.setSingleValue(singleRecordProtoBuilder.build()); - } - - msgBuilder.putArguments(key, argumentEntryProtoBuilder.build()); - }); - } - - clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); - } - - private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { - CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); - boolean shouldReinit = true; - if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) { - onCalculatedFieldDelete(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId(), callback); + private void clearState(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); + if (tpi.isMyPartition()) { + log.warn("Executing clearState, calculatedFieldId=[{}], entityId=[{}]", calculatedFieldId, entityId); + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId()); + states.remove(ctxId); + rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); } else { - callback.onSuccess(); - shouldReinit = false; + sendClearCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId); } - return shouldReinit; - } - - private void onCalculatedFieldDelete(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbCallback callback) { - try { - cleanupEntity(calculatedFieldId); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); - Set calculatedFieldIds = partitionedEntities.get(tpi); - if (calculatedFieldIds != null) { - calculatedFieldIds.remove(calculatedFieldId); - } - calculatedFieldCache.evict(calculatedFieldId); - states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); - List statesToRemove = states.keySet().stream() - .filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())) - .map(JacksonUtil::writeValueAsString) - .toList(); - rocksDBService.deleteAll(statesToRemove); - } catch (Exception e) { - log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e); - callback.onFailure(e); - } - } - - private boolean hasSignificantChanges(CalculatedField oldCalculatedField, CalculatedField newCalculatedField) { - if (oldCalculatedField == null) { - return true; - } - boolean entityIdChanged = !oldCalculatedField.getEntityId().equals(newCalculatedField.getEntityId()); - boolean typeChanged = !oldCalculatedField.getType().equals(newCalculatedField.getType()); - CalculatedFieldConfiguration oldConfig = oldCalculatedField.getConfiguration(); - CalculatedFieldConfiguration newConfig = newCalculatedField.getConfiguration(); - boolean argumentsChanged = !oldConfig.getArguments().equals(newConfig.getArguments()); - boolean outputTypeChanged = !oldConfig.getOutput().getType().equals(newConfig.getOutput().getType()); - boolean expressionChanged = !oldConfig.getExpression().equals(newConfig.getExpression()); - - return entityIdChanged || typeChanged || argumentsChanged || outputTypeChanged || expressionChanged; } private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) { @@ -605,82 +531,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor); } - private ListenableFuture fetchArguments(TenantId tenantId, EntityId entityId, Map necessaryArguments, Consumer> onComplete) { - Map argumentValues = new HashMap<>(); - List> futures = new ArrayList<>(); - necessaryArguments.forEach((key, argument) -> { - futures.add(Futures.transform(fetchArgumentValue(tenantId, entityId, argument), - result -> { - argumentValues.put(key, result); - return result; - }, calculatedFieldCallbackExecutor)); - }); - return Futures.transform(Futures.allAsList(futures), results -> { - onComplete.accept(argumentValues); - return null; - }, calculatedFieldCallbackExecutor); - } - - private ListenableFuture fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) { - EntityId argumentEntityId = argument.getEntityId(); - EntityId entityId = isProfileEntity(argumentEntityId) - ? targetEntityId - : argumentEntityId; - return fetchKvEntry(tenantId, entityId, argument); - } - - private ListenableFuture fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { - return switch (argument.getType()) { - case "TS_ROLLING" -> fetchTsRolling(tenantId, entityId, argument); - case "ATTRIBUTE" -> transformSingleValueArgument( - Futures.transform( - attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()), - result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), - calculatedFieldCallbackExecutor) - ); - case "TS_LATEST" -> transformSingleValueArgument( - Futures.transform( - timeseriesService.findLatest(tenantId, entityId, argument.getKey()), - result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), - calculatedFieldCallbackExecutor)); - default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); - }; - } - - private ListenableFuture fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument) { - long currentTime = System.currentTimeMillis(); - long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow(); - long startTs = currentTime - timeWindow; - int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit(); - - ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE); - ListenableFuture> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); - - return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); - } - - private ListenableFuture transformSingleValueArgument(ListenableFuture> kvEntryFuture) { - return Futures.transform(kvEntryFuture, kvEntry -> { - if (kvEntry.isPresent() && kvEntry.get().getValue() != null) { - return ArgumentEntry.createSingleValueArgument(kvEntry.get()); - } else { - return SingleValueArgumentEntry.EMPTY; - } - }, calculatedFieldCallbackExecutor); - } - - private KvEntry createDefaultKvEntry(Argument argument) { - String key = argument.getKey(); - String defaultValue = argument.getDefaultValue(); - if (NumberUtils.isParsable(defaultValue)) { - return new DoubleDataEntry(key, Double.parseDouble(defaultValue)); - } - if ("true".equalsIgnoreCase(defaultValue) || "false".equalsIgnoreCase(defaultValue)) { - return new BooleanDataEntry(key, Boolean.parseBoolean(defaultValue)); - } - return new StringDataEntry(key, defaultValue); - } - private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List calculatedFieldIds) { TenantId tenantId = calculatedFieldCtx.getTenantId(); CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); @@ -742,14 +592,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, MoreExecutors.directExecutor()); } - private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) { - String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); - if (stateStr == null) { - return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); - } - return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); - } - private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List calculatedFieldIds) { try { String type = calculatedFieldResult.getType(); @@ -770,6 +612,166 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private ListenableFuture fetchArguments(TenantId tenantId, EntityId entityId, Map necessaryArguments, Consumer> onComplete) { + Map argumentValues = new HashMap<>(); + List> futures = new ArrayList<>(); + necessaryArguments.forEach((key, argument) -> { + futures.add(Futures.transform(fetchArgumentValue(tenantId, entityId, argument), + result -> { + argumentValues.put(key, result); + return result; + }, calculatedFieldCallbackExecutor)); + }); + return Futures.transform(Futures.allAsList(futures), results -> { + onComplete.accept(argumentValues); + return null; + }, calculatedFieldCallbackExecutor); + } + + private ListenableFuture fetchArgumentValue(TenantId tenantId, EntityId targetEntityId, Argument argument) { + EntityId argumentEntityId = argument.getEntityId(); + EntityId entityId = isProfileEntity(argumentEntityId) + ? targetEntityId + : argumentEntityId; + return fetchKvEntry(tenantId, entityId, argument); + } + + private ListenableFuture fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { + return switch (argument.getType()) { + case "TS_ROLLING" -> fetchTsRolling(tenantId, entityId, argument); + case "ATTRIBUTE" -> transformSingleValueArgument( + Futures.transform( + attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()), + result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), + calculatedFieldCallbackExecutor) + ); + case "TS_LATEST" -> transformSingleValueArgument( + Futures.transform( + timeseriesService.findLatest(tenantId, entityId, argument.getKey()), + result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), + calculatedFieldCallbackExecutor)); + default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); + }; + } + + private ListenableFuture transformSingleValueArgument(ListenableFuture> kvEntryFuture) { + return Futures.transform(kvEntryFuture, kvEntry -> { + if (kvEntry.isPresent() && kvEntry.get().getValue() != null) { + return ArgumentEntry.createSingleValueArgument(kvEntry.get()); + } else { + return SingleValueArgumentEntry.EMPTY; + } + }, calculatedFieldCallbackExecutor); + } + + private ListenableFuture fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument) { + long currentTime = System.currentTimeMillis(); + long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow(); + long startTs = currentTime - timeWindow; + int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit(); + + ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE); + ListenableFuture> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); + + return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); + } + + private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, List calculatedFieldIds, Map argumentValues) { + TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = createBaseCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId); + if (argumentValues != null) { + argumentValues.forEach((key, argumentEntry) -> msgBuilder.putArguments(key, toArgumentEntryProto(argumentEntry))); + } + if (calculatedFieldIds != null) { + calculatedFieldIds.forEach(cfId -> msgBuilder.addCalculatedFields( + TransportProtos.CalculatedFieldIdProto.newBuilder() + .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits()) + .build() + )); + } + + clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null); + } + + private void sendClearCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId) { + TransportProtos.CalculatedFieldStateMsgProto msg = createBaseCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId) + .setClear(true) + .build(); + + clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msg).build(), null); + } + + private TransportProtos.CalculatedFieldStateMsgProto.Builder createBaseCalculatedFieldStateMsg( + TenantId tenantId, + CalculatedFieldId calculatedFieldId, + EntityId entityId + ) { + return TransportProtos.CalculatedFieldStateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits()) + .setEntityType(entityId.getEntityType().name()) + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + } + + private TransportProtos.ArgumentEntryProto toArgumentEntryProto(ArgumentEntry argumentEntry) { + TransportProtos.ArgumentEntryProto.Builder argumentProtoBuilder = TransportProtos.ArgumentEntryProto.newBuilder(); + + if (argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) { + TransportProtos.TsRollingProto.Builder tsRollingProtoBuilder = TransportProtos.TsRollingProto.newBuilder(); + tsRollingArgumentEntry.getTsRecords().forEach((ts, value) -> + tsRollingProtoBuilder.putTsRecords(ts, toObjectProto(value)) + ); + argumentProtoBuilder.setTsRecords(tsRollingProtoBuilder.build()); + } else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { + argumentProtoBuilder.setSingleValue( + TransportProtos.SingleValueProto.newBuilder() + .setTs(singleValueArgumentEntry.getTs()) + .setValue(toObjectProto(singleValueArgumentEntry.getValue())) + .build() + ); + } + + return argumentProtoBuilder.build(); + } + + private ArgumentEntry fromArgumentEntryProto(TransportProtos.ArgumentEntryProto entryProto) { + if (entryProto.hasTsRecords()) { + TsRollingArgumentEntry tsRollingArgumentEntry = new TsRollingArgumentEntry(); + entryProto.getTsRecords().getTsRecordsMap().forEach((ts, objectProto) -> + tsRollingArgumentEntry.getTsRecords().put(ts, fromObjectProto(objectProto)) + ); + return tsRollingArgumentEntry; + } else if (entryProto.hasSingleValue()) { + TransportProtos.SingleValueProto singleValueProto = entryProto.getSingleValue(); + return new SingleValueArgumentEntry(singleValueProto.getTs(), fromObjectProto(singleValueProto.getValue())); + } else { + throw new IllegalArgumentException("Unsupported ArgumentEntryProto type"); + } + } + + private KvEntry createDefaultKvEntry(Argument argument) { + String key = argument.getKey(); + String defaultValue = argument.getDefaultValue(); + if (NumberUtils.isParsable(defaultValue)) { + return new DoubleDataEntry(key, Double.parseDouble(defaultValue)); + } + if ("true".equalsIgnoreCase(defaultValue) || "false".equalsIgnoreCase(defaultValue)) { + return new BooleanDataEntry(key, Boolean.parseBoolean(defaultValue)); + } + return new StringDataEntry(key, defaultValue); + } + + private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) { + String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); + if (stateStr == null) { + return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); + } + return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); + } + private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { ObjectNode payload = JacksonUtil.newObjectNode(); Map resultMap = calculatedFieldResult.getResultMap(); diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 264b23f118..ec17914fd8 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -1183,6 +1183,46 @@ public class ProtoUtils { return builder.build(); } + public static TransportProtos.ObjectProto toObjectProto(Object value) { + if (value == null) { + throw new IllegalArgumentException("Cannot convert null to ObjectProto"); + } + + TransportProtos.ObjectProto.Builder builder = TransportProtos.ObjectProto.newBuilder(); + + if (value instanceof String) { + builder.setStringValue((String) value); + } else if (value instanceof Integer) { + builder.setIntValue((Integer) value); + } else if (value instanceof Long) { + builder.setLongValue((Long) value); + } else if (value instanceof Double) { + builder.setDoubleValue((Double) value); + } else if (value instanceof Boolean) { + builder.setBoolValue((Boolean) value); + } else { + throw new IllegalArgumentException("Unsupported value type: " + value.getClass().getName()); + } + + return builder.build(); + } + + public static Object fromObjectProto(TransportProtos.ObjectProto proto) { + try { + return switch (proto.getValueCase()) { + case STRINGVALUE -> proto.getStringValue(); + case INTVALUE -> proto.getIntValue(); + case LONGVALUE -> proto.getLongValue(); + case DOUBLEVALUE -> proto.getDoubleValue(); + case BOOLVALUE -> proto.getBoolValue(); + case VALUE_NOT_SET -> throw new IllegalArgumentException("Value not set in ObjectProto"); + }; + } catch (Exception e) { + log.error("Failed to deserialize ObjectProto: [{}]", proto, e); + return null; + } + } + private static boolean isNotNull(Object obj) { return obj != null; } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index f6f7afd5d8..581f9eb9f2 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -817,8 +817,9 @@ message CalculatedFieldStateMsgProto { string entityType = 5; int64 entityIdMSB = 6; int64 entityIdLSB = 7; - repeated CalculatedFieldIdProto calculatedFields = 8; - map arguments = 9; + bool clear = 8; + repeated CalculatedFieldIdProto calculatedFields = 9; + map arguments = 10; } message CalculatedFieldIdProto { @@ -843,8 +844,13 @@ message SingleValueProto { } message ObjectProto { - string type = 1; - string value = 2; + oneof value { + string stringValue = 1; + int32 intValue = 2; + int64 longValue = 3; + double doubleValue = 4; + bool boolValue = 5; + } } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.