From 7d8a76ce7f8e1ca15e08709bb2fcba8446578fbf Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 6 Jan 2025 15:32:23 +0200 Subject: [PATCH] Draft of the review --- .../cf/DefaultCalculatedFieldCache.java | 10 ++++------ .../AbstractSubscriptionService.java | 9 +++++++-- .../DefaultTelemetrySubscriptionService.java | 20 +++++-------------- common/proto/src/main/proto/queue.proto | 7 +++++++ 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index a4077b599b..c4293b9edc 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -122,16 +122,14 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public List getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) { List cfLinks = entityIdCalculatedFieldLinks.get(entityId); - if (cfLinks == null || cfLinks.isEmpty()) { + if (cfLinks == null) { calculatedFieldFetchLock.lock(); try { cfLinks = entityIdCalculatedFieldLinks.get(entityId); - if (cfLinks == null || cfLinks.isEmpty()) { + if (cfLinks == null) { cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId); - if (cfLinks != null) { - entityIdCalculatedFieldLinks.put(entityId, cfLinks); - log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks); - } + entityIdCalculatedFieldLinks.put(entityId, cfLinks); + log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks); } } finally { calculatedFieldFetchLock.unlock(); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java index e26ede2bae..0bb8f76398 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java @@ -38,6 +38,7 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -99,7 +100,11 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList } protected void addWsCallback(ListenableFuture saveFuture, Consumer callback) { - Futures.addCallback(saveFuture, new FutureCallback() { + addCallback(saveFuture, callback, wsCallBackExecutor); + } + + protected void addCallback(ListenableFuture saveFuture, Consumer callback, Executor executor) { + Futures.addCallback(saveFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable T result) { callback.accept(result); @@ -108,7 +113,7 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList @Override public void onFailure(Throwable t) { } - }, wsCallBackExecutor); + }, executor); } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 99cbd89494..f873e48774 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -154,7 +154,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (request.isSaveLatest() && !request.isOnlyLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } - addCalculatedFieldCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds()))); + // Use something very similar to addMainCallback. don't forget about tsCallBackExecutor. + //CalculatedFieldTimeSeriesUpdateRequest - add constructor that accepts the TimeseriesSaveRequest + addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor); return saveFuture; } @@ -170,7 +172,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); addMainCallback(saveFuture, request.getCallback()); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); - addCalculatedFieldCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getPreviousCalculatedFieldIds()))); + //CalculatedFieldAttributeUpdateRequest - add constructor that accepts the AttributesSaveRequest + addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor); } @Override @@ -343,17 +346,4 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }; } - protected void addCalculatedFieldCallback(ListenableFuture saveFuture, Consumer callback) { - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable T result) { - callback.accept(result); - } - - @Override - public void onFailure(Throwable t) { - } - }, tsCallBackExecutor); - } - } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 9cba62eecc..56f347f311 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -809,6 +809,13 @@ message ProfileEntityMsgProto { bool deleted = 10; } +message ToServerB { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + repeated CfIdEntityIdPair links = 3; + value = 4; +} + message CalculatedFieldStateMsgProto { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2;