diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 2f55de6b77..f7bcb4e678 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -35,7 +35,6 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.cluster.TbClusterService; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; @@ -98,7 +97,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.thingsboard.server.common.data.DataConstants.SCOPE; import static org.thingsboard.server.common.util.ProtoUtils.fromObjectProto; @@ -115,7 +113,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private final CalculatedFieldCache calculatedFieldCache; private final AttributesService attributesService; private final TimeseriesService timeseriesService; - // private final RocksDBService rocksDBService; + private final RocksDBService rocksDBService; private final TbClusterService clusterService; private final TbelInvokeService tbelInvokeService; @@ -168,35 +166,38 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas protected Map>> onAddedPartitions(Set addedPartitions) { var result = new HashMap>>(); PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); - Map> tpiCalculatedFieldMap = new HashMap<>(); + Map> tpiTargetEntityMap = new HashMap<>(); for (CalculatedField cf : cfs) { - TopicPartitionInfo tpi; - try { - tpi = partitionService.resolve(ServiceType.TB_CORE, cf.getTenantId(), cf.getId()); - } catch (Exception e) { - log.warn("Failed to resolve partition for CalculatedField [{}], tenant id [{}]. Reason: {}", - cf.getId(), cf.getTenantId(), e.getMessage()); - continue; - } - if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId().getId()))) { - tpiCalculatedFieldMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(cf); + + Consumer resolvePartition = entityId -> { + TopicPartitionInfo tpi; + try { + tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, cf.getTenantId(), entityId); + if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId().getId()))) { + tpiTargetEntityMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(new CalculatedFieldEntityCtxId(cf.getId().getId(), entityId.getId())); + } + } catch (Exception e) { + log.warn("Failed to resolve partition for CalculatedFieldEntityCtxId: entityId=[{}], tenantId=[{}]. Reason: {}", + entityId, cf.getTenantId(), e.getMessage()); + } + }; + + EntityId cfEntityId = cf.getEntityId(); + if (isProfileEntity(cfEntityId)) { + calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId).forEach(resolvePartition); + } else { + resolvePartition.accept(cfEntityId); } } - for (var entry : tpiCalculatedFieldMap.entrySet()) { - for (List partition : Lists.partition(entry.getValue(), 1000)) { + for (var entry : tpiTargetEntityMap.entrySet()) { + for (List partition : Lists.partition(entry.getValue(), 1000)) { log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size()); var future = calculatedFieldExecutor.submit(() -> { try { - for (CalculatedField cf : partition) { - EntityId cfEntityId = cf.getEntityId(); - if (isProfileEntity(cfEntityId)) { - calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId) - .forEach(entityId -> restoreState(cf, entityId)); - } else { - restoreState(cf, cfEntityId); - } + for (CalculatedFieldEntityCtxId ctxId : partition) { + restoreState(ctxId.cfId(), ctxId.entityId()); } } catch (Throwable t) { log.error("Unexpected exception while restoring CalculatedField states", t); @@ -209,17 +210,16 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return result; } - private void restoreState(CalculatedField cf, EntityId entityId) { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cf.getId().getId(), entityId.getId()); -// String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId)); + private void restoreState(UUID calculatedFieldId, UUID entityId) { + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId, entityId); + String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId)); - String storedState = null; if (storedState != null) { CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class); states.put(ctxId, restoredCtx); - log.info("Restored state for CalculatedField [{}]", cf.getId()); + log.info("Restored state for CalculatedField [{}]", calculatedFieldId); } else { - log.warn("No state found for CalculatedField [{}], entity [{}].", cf.getId(), entityId); + log.warn("No state found for CalculatedField [{}], entity [{}].", calculatedFieldId, entityId); } } @@ -238,12 +238,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); log.info("Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); - if (!tpi.isMyPartition()) { - clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldMsg(proto).build(), null); - log.debug("[{}][{}] Calculated field belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, calculatedFieldId, tpi.getFullTopicName()); - callback.onFailure(new RuntimeException("Calculated field belongs to external partition " + tpi.getFullTopicName() + "!")); - } if (proto.getDeleted()) { log.warn("Executing onCalculatedFieldDelete, calculatedFieldId=[{}]", calculatedFieldId); onCalculatedFieldDelete(tenantId, calculatedFieldId, callback); @@ -307,18 +301,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void onCalculatedFieldDelete(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbCallback callback) { try { cleanupEntity(calculatedFieldId); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); - Set calculatedFieldIds = partitionedEntities.get(tpi); - if (calculatedFieldIds != null) { - calculatedFieldIds.remove(calculatedFieldId); - } -// calculatedFieldCache.evict(calculatedFieldId); states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())); List statesToRemove = states.keySet().stream() .filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId())) .map(JacksonUtil::writeValueAsString) .toList(); -// rocksDBService.deleteAll(statesToRemove); + rocksDBService.deleteAll(statesToRemove); } catch (Exception e) { log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e); callback.onFailure(e); @@ -345,22 +333,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas try { TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId(); EntityId entityId = calculatedFieldTelemetryUpdateRequest.getEntityId(); - AttributeScope scope = calculatedFieldTelemetryUpdateRequest.getScope(); - List telemetry = calculatedFieldTelemetryUpdateRequest.getKvEntries(); - List calculatedFieldIds = calculatedFieldTelemetryUpdateRequest.getCalculatedFieldIds(); if (supportedReferencedEntities.contains(entityId.getEntityType())) { EntityId profileId = getProfileId(tenantId, entityId); - List cfLinks = Stream.concat( - calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream(), - profileId != null ? calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream() : Stream.empty() - ).toList(); - - cfLinks.forEach(link -> { + getCalculatedFieldLinks(tenantId, entityId, profileId).forEach(link -> { CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); - Map telemetryKeys = getTelemetryKeysFromLink(link, scope); - Map updatedTelemetry = telemetry.stream() + Map telemetryKeys = calculatedFieldTelemetryUpdateRequest.getTelemetryKeysFromLink(link); + Map updatedTelemetry = calculatedFieldTelemetryUpdateRequest.getKvEntries().stream() .filter(entry -> telemetryKeys.containsValue(entry.getKey())) .collect(Collectors.toMap( entry -> getMappedKey(entry, telemetryKeys), @@ -369,7 +349,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas )); if (!updatedTelemetry.isEmpty()) { - executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, calculatedFieldIds, updatedTelemetry); + List previousCalculatedFieldIds = calculatedFieldTelemetryUpdateRequest.getPreviousCalculatedFieldIds(); + executeTelemetryUpdate(tenantId, entityId, calculatedFieldId, previousCalculatedFieldIds, updatedTelemetry); } }); } @@ -378,14 +359,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private Map getTelemetryKeysFromLink(CalculatedFieldLink link, AttributeScope scope) { - return scope == null ? link.getConfiguration().getTimeSeries() : switch (scope) { - case CLIENT_SCOPE -> link.getConfiguration().getClientAttributes(); - case SERVER_SCOPE -> link.getConfiguration().getServerAttributes(); - case SHARED_SCOPE -> link.getConfiguration().getSharedAttributes(); - }; - } - private String getMappedKey(KvEntry entry, Map telemetry) { return telemetry.entrySet().stream() .filter(kvEntry -> kvEntry.getValue().equals(entry.getKey())) @@ -394,7 +367,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas .orElse(entry.getKey()); } - private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List calculatedFieldIds, Map updatedTelemetry) { + private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List previousCalculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId); CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId); CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); @@ -406,12 +379,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas case ASSET_PROFILE, DEVICE_PROFILE -> { boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId); if (isCommonEntity) { - calculatedFieldCache.getEntitiesByProfile(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, calculatedFieldIds)); + calculatedFieldCache.getEntitiesByProfile(tenantId, cfEntityId).forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues, previousCalculatedFieldIds)); } else { - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, calculatedFieldIds); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, previousCalculatedFieldIds); } } - default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, calculatedFieldIds); + default -> + updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, previousCalculatedFieldIds); } } @@ -427,14 +401,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return; } - List calculatedFieldIds = proto.getCalculatedFieldsList().stream() + List previousCalculatedFieldIds = proto.getPreviousCalculatedFieldsList().stream() .map(cfIdProto -> new CalculatedFieldId(new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) .collect(Collectors.toCollection(ArrayList::new)); Map argumentsMap = proto.getArgumentsMap().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue()))); CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService); - updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, calculatedFieldIds); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, previousCalculatedFieldIds); } catch (Exception e) { log.trace("Failed to process calculated field update state msg: [{}]", proto, e); } @@ -449,9 +423,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB())); log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); -// calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId); -// calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId); - calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) .forEach(cfId -> clearState(tenantId, cfId, entityId)); @@ -470,18 +441,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.info("Received ProfileEntityMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); if (proto.getDeleted()) { log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); -// calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId); - - - List calculatedFieldIds = Stream.concat( - calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream().map(CalculatedFieldLink::getCalculatedFieldId), - calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream().map(CalculatedFieldLink::getCalculatedFieldId) - ).toList(); - calculatedFieldIds.forEach(cfId -> clearState(tenantId, cfId, entityId)); + getCalculatedFieldLinks(tenantId, entityId, profileId) + .forEach(link -> clearState(tenantId, link.getCalculatedFieldId(), entityId)); } else { log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId); -// calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId); initializeStateForEntityByProfile(tenantId, entityId, profileId, callback); } } catch (Exception e) { @@ -490,12 +454,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private void clearState(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); if (tpi.isMyPartition()) { log.warn("Executing clearState, calculatedFieldId=[{}], entityId=[{}]", calculatedFieldId, entityId); CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId()); states.remove(ctxId); -// rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); + rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); } else { sendClearCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId); } @@ -541,12 +505,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor); } - private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List calculatedFieldIds) { + private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List previousCalculatedFieldIds) { TenantId tenantId = calculatedFieldCtx.getTenantId(); CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); Map argumentsMap = new HashMap<>(argumentValues); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); if (tpi.isMyPartition()) { CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); @@ -559,12 +523,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Consumer performUpdateState = (state) -> { if (state.updateState(argumentsMap)) { calculatedFieldEntityCtx.setState(state); -// rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); + rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); Map arguments = state.getArguments(); boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY); if (allArgsPresent) { - performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds); + performCalculation(calculatedFieldCtx, state, entityId, previousCalculatedFieldIds); } log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId); } @@ -599,17 +563,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return calculatedFieldEntityCtx; }); } else { - sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, calculatedFieldIds, argumentsMap); + sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, previousCalculatedFieldIds, argumentsMap); } } - private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List calculatedFieldIds) { + private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List previousCalculatedFieldIds) { ListenableFuture resultFuture = state.performCalculation(calculatedFieldCtx); Futures.addCallback(resultFuture, new FutureCallback<>() { @Override public void onSuccess(CalculatedFieldResult result) { if (result != null) { - pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), calculatedFieldCtx.getCfId(), entityId, result, calculatedFieldIds); + pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), calculatedFieldCtx.getCfId(), entityId, result, previousCalculatedFieldIds); } } @@ -620,20 +584,20 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, MoreExecutors.directExecutor()); } - private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List calculatedFieldIds) { + private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List previousCalculatedFieldIds) { try { OutputType type = calculatedFieldResult.getType(); TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; ObjectNode payload = createJsonPayload(calculatedFieldResult); - if (calculatedFieldIds == null) { - calculatedFieldIds = new ArrayList<>(); - } - if (calculatedFieldIds.contains(calculatedFieldId)) { + if (previousCalculatedFieldIds != null && previousCalculatedFieldIds.contains(calculatedFieldId)) { throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop."); } + List calculatedFieldIds = previousCalculatedFieldIds != null + ? new ArrayList<>(previousCalculatedFieldIds) + : new ArrayList<>(); calculatedFieldIds.add(calculatedFieldId); - TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).calculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build(); + TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).previousCalculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build(); clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); log.info("Pushed message to rule engine: originatorId=[{}]", originatorId); } catch (Exception e) { @@ -641,6 +605,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private List getCalculatedFieldLinks(TenantId tenantId, EntityId entityId, EntityId profileId) { + List links = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId)); + if (profileId != null) { + links.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId)); + } + return links; + } + private ListenableFuture fetchArguments(TenantId tenantId, EntityId entityId, Map necessaryArguments, Consumer> onComplete) { Map argumentValues = new HashMap<>(); List> futures = new ArrayList<>(); @@ -704,13 +676,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); } - private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, List calculatedFieldIds, Map argumentValues) { + private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, List previousCalculatedFieldIds, Map argumentValues) { TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = createBaseCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId); if (argumentValues != null) { argumentValues.forEach((key, argumentEntry) -> msgBuilder.putArguments(key, toArgumentEntryProto(argumentEntry))); } - if (calculatedFieldIds != null) { - calculatedFieldIds.forEach(cfId -> msgBuilder.addCalculatedFields( + if (previousCalculatedFieldIds != null) { + previousCalculatedFieldIds.forEach(cfId -> msgBuilder.addPreviousCalculatedFields( TransportProtos.CalculatedFieldIdProto.newBuilder() .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) .setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits()) @@ -794,11 +766,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) { -// String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); -// if (stateStr == null) { - return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); -// } -// return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); + String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId)); + if (stateStr == null) { + return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); + } + return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class); } private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java index 8479ff37d7..c56217b2ce 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java @@ -18,12 +18,14 @@ package org.thingsboard.server.service.cf.telemetry; import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import java.util.List; +import java.util.Map; @Data @AllArgsConstructor @@ -33,6 +35,15 @@ public class CalculatedFieldAttributeUpdateRequest implements CalculatedFieldTel private EntityId entityId; private AttributeScope scope; private List kvEntries; - private List calculatedFieldIds; + private List previousCalculatedFieldIds; + + @Override + public Map getTelemetryKeysFromLink(CalculatedFieldLink link) { + return switch (scope) { + case CLIENT_SCOPE -> link.getConfiguration().getClientAttributes(); + case SERVER_SCOPE -> link.getConfiguration().getServerAttributes(); + case SHARED_SCOPE -> link.getConfiguration().getSharedAttributes(); + }; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java index 3c28833f31..98062a08db 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java @@ -15,13 +15,14 @@ */ package org.thingsboard.server.service.cf.telemetry; -import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.KvEntry; import java.util.List; +import java.util.Map; public interface CalculatedFieldTelemetryUpdateRequest { @@ -29,10 +30,10 @@ public interface CalculatedFieldTelemetryUpdateRequest { EntityId getEntityId(); - AttributeScope getScope(); - List getKvEntries(); - List getCalculatedFieldIds(); + List getPreviousCalculatedFieldIds(); + + Map getTelemetryKeysFromLink(CalculatedFieldLink link); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java index 987d899465..bd2161dca1 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java @@ -17,13 +17,14 @@ package org.thingsboard.server.service.cf.telemetry; import lombok.AllArgsConstructor; import lombok.Data; -import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; +import java.util.Map; @Data @AllArgsConstructor @@ -32,11 +33,11 @@ public class CalculatedFieldTimeSeriesUpdateRequest implements CalculatedFieldTe private TenantId tenantId; private EntityId entityId; private List kvEntries; - private List calculatedFieldIds; + private List previousCalculatedFieldIds; @Override - public AttributeScope getScope() { - return null; + public Map getTelemetryKeysFromLink(CalculatedFieldLink link) { + return link.getConfiguration().getTimeSeries(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index bf3076be5c..99cbd89494 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -154,7 +154,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (request.isSaveLatest() && !request.isOnlyLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } - calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getCalculatedFieldIds())); + addCalculatedFieldCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds()))); return saveFuture; } @@ -170,7 +170,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); addMainCallback(saveFuture, request.getCallback()); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); - calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getCalculatedFieldIds())); + addCalculatedFieldCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getPreviousCalculatedFieldIds()))); } @Override @@ -243,7 +243,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer .onlyLatest(true) .callback(new FutureCallback<>() { @Override - public void onSuccess(@Nullable Void tmp) {} + public void onSuccess(@Nullable Void tmp) { + } @Override public void onFailure(Throwable t) { @@ -342,4 +343,17 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }; } + protected void addCalculatedFieldCallback(ListenableFuture saveFuture, Consumer callback) { + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable T result) { + callback.accept(result); + } + + @Override + public void onFailure(Throwable t) { + } + }, tsCallBackExecutor); + } + } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 0805175f77..9d99f38c60 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -35,10 +35,10 @@ import org.thingsboard.server.common.msg.gen.MsgProtos; import org.thingsboard.server.common.msg.queue.TbMsgCallback; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; /** * Created by ashvayka on 13.01.18. @@ -67,7 +67,7 @@ public final class TbMsg implements Serializable { private final UUID correlationId; private final Integer partition; - private final List calculatedFieldIds; + private final List previousCalculatedFieldIds; @Getter(value = AccessLevel.NONE) @JsonIgnore @@ -117,7 +117,7 @@ public final class TbMsg implements Serializable { } private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, - RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID correlationId, Integer partition, List calculatedFieldIds, TbMsgProcessingCtx ctx, TbMsgCallback callback) { + RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID correlationId, Integer partition, List previousCalculatedFieldIds, TbMsgProcessingCtx ctx, TbMsgCallback callback) { this.id = id != null ? id : UUID.randomUUID(); this.queueName = queueName; if (ts > 0) { @@ -144,7 +144,9 @@ public final class TbMsg implements Serializable { this.ruleNodeId = ruleNodeId; this.correlationId = correlationId; this.partition = partition; - this.calculatedFieldIds = calculatedFieldIds; + this.previousCalculatedFieldIds = previousCalculatedFieldIds != null + ? new CopyOnWriteArrayList<>(previousCalculatedFieldIds) + : new CopyOnWriteArrayList<>(); this.ctx = ctx != null ? ctx : new TbMsgProcessingCtx(); this.callback = Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY); } @@ -192,8 +194,8 @@ public final class TbMsg implements Serializable { builder.setPartition(msg.getPartition()); } - if (msg.getCalculatedFieldIds() != null) { - for (CalculatedFieldId calculatedFieldId : msg.getCalculatedFieldIds()) { + if (msg.getPreviousCalculatedFieldIds() != null) { + for (CalculatedFieldId calculatedFieldId : msg.getPreviousCalculatedFieldIds()) { MsgProtos.CalculatedFieldIdProto calculatedFieldIdProto = MsgProtos.CalculatedFieldIdProto.newBuilder() .setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits()) .setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits()) @@ -216,7 +218,7 @@ public final class TbMsg implements Serializable { RuleNodeId ruleNodeId = null; UUID correlationId = null; Integer partition = null; - List calculatedFieldIds = new ArrayList<>(); + List calculatedFieldIds = new CopyOnWriteArrayList<>(); if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); } @@ -274,6 +276,7 @@ public final class TbMsg implements Serializable { /** * Checks if the message is still valid for processing. May be invalid if the message pack is timed-out or canceled. + * * @return 'true' if message is valid for processing, 'false' otherwise. */ public boolean isValid() { @@ -368,7 +371,7 @@ public final class TbMsg implements Serializable { protected RuleNodeId ruleNodeId; protected UUID correlationId; protected Integer partition; - protected List calculatedFieldIds; + protected List previousCalculatedFieldIds; protected TbMsgProcessingCtx ctx; protected TbMsgCallback callback; @@ -390,7 +393,7 @@ public final class TbMsg implements Serializable { this.ruleNodeId = tbMsg.ruleNodeId; this.correlationId = tbMsg.correlationId; this.partition = tbMsg.partition; - this.calculatedFieldIds = tbMsg.calculatedFieldIds; + this.previousCalculatedFieldIds = tbMsg.previousCalculatedFieldIds; this.ctx = tbMsg.ctx; this.callback = tbMsg.callback; } @@ -413,8 +416,7 @@ public final class TbMsg implements Serializable { /** *

Deprecated: This should only be used when you need to specify a custom message type that doesn't exist in the {@link TbMsgType} enum. * Prefer using {@link #type(TbMsgType)} instead. - * - * */ + */ @Deprecated public TbMsgBuilder type(String type) { this.type = type; @@ -482,8 +484,8 @@ public final class TbMsg implements Serializable { return this; } - public TbMsgBuilder calculatedFieldIds(List calculatedFieldIds) { - this.calculatedFieldIds = calculatedFieldIds; + public TbMsgBuilder previousCalculatedFieldIds(List previousCalculatedFieldIds) { + this.previousCalculatedFieldIds = previousCalculatedFieldIds; return this; } @@ -498,7 +500,7 @@ public final class TbMsg implements Serializable { } public TbMsg build() { - return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, calculatedFieldIds, ctx, callback); + return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, previousCalculatedFieldIds, ctx, callback); } public String toString() { @@ -506,7 +508,7 @@ public final class TbMsg implements Serializable { ", type=" + this.type + ", internalType=" + this.internalType + ", originator=" + this.originator + ", customerId=" + this.customerId + ", metaData=" + this.metaData + ", dataType=" + this.dataType + ", data=" + this.data + ", ruleChainId=" + this.ruleChainId + ", ruleNodeId=" + this.ruleNodeId + - ", correlationId=" + this.correlationId + ", partition=" + this.partition + ", calculatedFields=" + this.calculatedFieldIds + + ", correlationId=" + this.correlationId + ", partition=" + this.partition + ", previousCalculatedFields=" + this.previousCalculatedFieldIds + ", ctx=" + this.ctx + ", callback=" + this.callback + ")"; } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 1c4927c422..9cba62eecc 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -818,7 +818,7 @@ message CalculatedFieldStateMsgProto { int64 entityIdMSB = 6; int64 entityIdLSB = 7; bool clear = 8; - repeated CalculatedFieldIdProto calculatedFields = 9; + repeated CalculatedFieldIdProto previousCalculatedFields = 9; map arguments = 10; } diff --git a/docker/docker-compose.cluster.yml b/docker/docker-compose.cluster.yml deleted file mode 100644 index 4c74238a10..0000000000 --- a/docker/docker-compose.cluster.yml +++ /dev/null @@ -1,45 +0,0 @@ -# -# Copyright © 2016-2024 The Thingsboard Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -version: '3.0' - -services: - kafka: - restart: always - image: "bitnami/kafka:3.7.0" - ports: - - "9092:9092" - env_file: - - kafka.env - depends_on: - - zookeeper - zookeeper: - restart: always - image: "zookeeper:3.8.0" - ports: - - "2181" - environment: - ZOO_MY_ID: 1 - ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181 - ZOO_ADMINSERVER_ENABLED: "false" - redis: - restart: always - image: bitnami/redis:7.2 - environment: - # ALLOW_EMPTY_PASSWORD is recommended only for development. - ALLOW_EMPTY_PASSWORD: "yes" - ports: - - '6379:6379' diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index 9747a6033e..406b2d0d5c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -41,7 +41,7 @@ public class AttributesSaveRequest { private final AttributeScope scope; private final List entries; private final boolean notifyDevice; - private final List calculatedFieldIds; + private final List previousCalculatedFieldIds; private final FutureCallback callback; public static Builder builder() { @@ -55,7 +55,7 @@ public class AttributesSaveRequest { private AttributeScope scope; private List entries; private boolean notifyDevice = true; - private List calculatedFieldIds; + private List previousCalculatedFieldIds; private FutureCallback callback; Builder() {} @@ -103,8 +103,8 @@ public class AttributesSaveRequest { return this; } - public Builder calculatedFieldIds(List calculatedFieldIds) { - this.calculatedFieldIds = calculatedFieldIds; + public Builder previousCalculatedFieldIds(List previousCalculatedFieldIds) { + this.previousCalculatedFieldIds = previousCalculatedFieldIds; return this; } @@ -128,7 +128,7 @@ public class AttributesSaveRequest { } public AttributesSaveRequest build() { - return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, calculatedFieldIds, callback); + return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, previousCalculatedFieldIds, callback); } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 12afa2d939..95eb788e5f 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -41,7 +41,7 @@ public class TimeseriesSaveRequest { private final long ttl; private final boolean saveLatest; private final boolean onlyLatest; - private final List calculatedFieldIds; + private final List previousCalculatedFieldIds; private final FutureCallback callback; public static Builder builder() { @@ -58,7 +58,7 @@ public class TimeseriesSaveRequest { private FutureCallback callback; private boolean saveLatest = true; private boolean onlyLatest; - private List calculatedFieldIds; + private List previousCalculatedFieldIds; Builder() {} @@ -106,8 +106,8 @@ public class TimeseriesSaveRequest { return this; } - public Builder calculatedFieldIds(List calculatedFieldIds) { - this.calculatedFieldIds = calculatedFieldIds; + public Builder previousCalculatedFieldIds(List previousCalculatedFieldIds) { + this.previousCalculatedFieldIds = previousCalculatedFieldIds; return this; } @@ -131,7 +131,7 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, calculatedFieldIds, callback); + return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, previousCalculatedFieldIds, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 20d0dda42f..ae2fce6575 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -125,7 +125,7 @@ public class TbMsgAttributesNode implements TbNode { .scope(scope) .entries(attributes) .notifyDevice(config.isNotifyDevice() || checkNotifyDeviceMdValue(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY))) - .calculatedFieldIds(msg.getCalculatedFieldIds()) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .callback(callback) .build()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 386e56320a..89f7844313 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -112,7 +112,7 @@ public class TbMsgTimeseriesNode implements TbNode { .entries(tsKvEntryList) .ttl(ttl) .saveLatest(!config.isSkipLatestPersistence()) - .calculatedFieldIds(msg.getCalculatedFieldIds()) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .callback(new TelemetryNodeCallback(ctx, msg)) .build()); }