From 19234df2b8e03d8d2af392faa32dfb44cc71bc77 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 5 Dec 2024 16:46:12 +0200 Subject: [PATCH] improved implementation of telemetry update --- ...efaultCalculatedFieldExecutionService.java | 60 ++++++++++++------- .../DefaultTelemetrySubscriptionService.java | 11 ++-- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 732d050a22..4a1e987687 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -177,7 +177,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas onCalculatedFieldDelete(calculatedFieldId, callback); callback.onSuccess(); } - CalculatedField cf = getOfFetchFromDb(tenantId, calculatedFieldId); + CalculatedField cf = getOrFetchFromDb(tenantId, calculatedFieldId); if (proto.getUpdated()) { log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId); boolean shouldReinit = onCalculatedFieldUpdate(cf, callback); @@ -226,14 +226,32 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry) { try { log.info("Received telemetry update msg: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId); - CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> { - CalculatedField calculatedField = getOfFetchFromDb(tenantId, id); - return new CalculatedFieldCtx(calculatedField, tbelInvokeService); - }); + CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService)); Map argumentValues = updatedTelemetry.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); - log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId); + .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) { + updateStorage(tenantId, entityId, Optional.of(entry.getValue())); + } + return ArgumentEntry.createSingleValueArgument(entry.getValue()); + })); + + EntityId cfEntityId = calculatedField.getEntityId(); + switch (cfEntityId.getEntityType()) { + case ASSET_PROFILE, DEVICE_PROFILE -> { + boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId); + if (isCommonEntity) { + PageDataIterable entities = cfEntityId.getEntityType() == EntityType.ASSET_PROFILE + ? new PageDataIterable<>(pageLink -> assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) cfEntityId, pageLink), initFetchPackSize) + : new PageDataIterable<>(pageLink -> deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) cfEntityId, pageLink), initFetchPackSize); + entities.forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues)); + } else { + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); + } + } + default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues); + } + log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId); } catch (Exception e) { log.trace("Failed to update telemetry for calculatedFieldId: [{}]", calculatedFieldId, e); } @@ -299,7 +317,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { - CalculatedField oldCalculatedField = getOfFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); + CalculatedField oldCalculatedField = getOrFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId()); boolean shouldReinit = true; if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) { onCalculatedFieldDelete(updatedCalculatedField.getId(), callback); @@ -328,7 +346,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private CalculatedField getOfFetchFromDb(TenantId tenantId, CalculatedFieldId calculatedFieldId) { + private CalculatedField getOrFetchFromDb(TenantId tenantId, CalculatedFieldId calculatedFieldId) { return calculatedFields.computeIfAbsent(calculatedFieldId, cfId -> calculatedFieldService.findById(tenantId, calculatedFieldId)); } @@ -435,17 +453,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private void updateStorage(TenantId tenantId, EntityId entityId, Optional kvEntry) { - if (kvEntry.isEmpty()) { - return; - } - List kvEntries = switch (entityId.getEntityType()) { - case TENANT -> tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>()); - case CUSTOMER -> customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>()); - default -> null; - }; - if (kvEntries != null && !kvEntries.contains(kvEntry.get())) { - kvEntries.add(kvEntry.get()); - } + kvEntry.ifPresent(entry -> { + List kvEntries = switch (entityId.getEntityType()) { + case TENANT -> tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>()); + case CUSTOMER -> customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>()); + default -> null; + }; + if (kvEntries != null) { + kvEntries.removeIf(existingEntry -> existingEntry.getKey().equals(entry.getKey())); + kvEntries.add(entry); + } + }); } private KvEntry createDefaultKvEntry(Argument argument) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index f6c19eb078..ad64204e0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -212,16 +212,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private void updateTelemetryInCalculatedFields(TenantId tenantId, EntityId entityId, List telemetry) { EntityType entityType = entityId.getEntityType(); - if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType)) { - EntityId profileId; + if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType) || EntityType.CUSTOMER.equals(entityType) || EntityType.TENANT.equals(entityType)) { + EntityId profileId = null; if (EntityType.ASSET.equals(entityType)) { profileId = assetProfileCache.get(tenantId, (AssetId) entityId).getId(); - } else { + } else if (EntityType.DEVICE.equals(entityType)) { profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); } - List cfLinks = new ArrayList<>(); - cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId)); - cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, profileId)); + List cfLinks = new ArrayList<>(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId)); + Optional.ofNullable(profileId).ifPresent(id -> cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, id))); if (!cfLinks.isEmpty()) { cfLinks.forEach(link -> { CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();