From 9dfc4da49bab606be616a7dcb23a69d5e697b08a Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Fri, 28 Nov 2025 15:04:48 +0200 Subject: [PATCH] Refactoring for AbstractCalculatedFieldProcessingService --- ...tractCalculatedFieldProcessingService.java | 92 ++++++++----------- 1 file changed, 39 insertions(+), 53 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index 03511da80f..6dd44a492c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import jakarta.annotation.PostConstruct; @@ -428,37 +427,21 @@ public abstract class AbstractCalculatedFieldProcessingService { JsonElement jsonResult = JsonParser.parseString(Objects.requireNonNull(cfResult.stringValue())); log.trace("[{}][{}] Saving CF result: {}", tenantId, entityId, jsonResult); - - SettableFuture future = SettableFuture.create(); switch (type) { - case ATTRIBUTES -> saveAttributes(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfResult.getScope(), cfResult.getCalculatedFieldName(), cfIds, future); - case TIME_SERIES -> saveTimeSeries(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfIds, System.currentTimeMillis(), future); + case ATTRIBUTES -> saveAttributes(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfResult.getScope(), cfResult.getCalculatedFieldName(), cfIds, callback); + case TIME_SERIES -> saveTimeSeries(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfIds, System.currentTimeMillis(), callback); } - - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(Void v) { - callback.onSuccess(); - log.debug("[{}][{}] Saved CF result: {}", tenantId, entityId, cfResult); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - log.error("[{}][{}] Failed to save CF result {}", tenantId, entityId, cfResult, t); - } - }, MoreExecutors.directExecutor()); } - private void saveAttributes(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, AttributeScope scope, String cfName, List cfIds, SettableFuture future) { + private void saveAttributes(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, AttributeScope scope, String cfName, List cfIds, TbCallback callback) { if (!(outputStrategy instanceof AttributesImmediateOutputStrategy attOutputStrategy)) { - future.setException(new IllegalArgumentException("Only AttributeImmediateOutputStrategy is supported.")); + callback.onFailure(new IllegalArgumentException("Only AttributeImmediateOutputStrategy is supported.")); } else { AttributesSaveRequest.Strategy strategy = new Strategy(attOutputStrategy.isSaveAttribute(), attOutputStrategy.isSendWsUpdate(), attOutputStrategy.isProcessCfs()); List newAttributes = JsonConverter.convertToAttributes(jsonResult); if (!attOutputStrategy.isUpdateAttributesOnlyOnValueChange()) { - saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, newAttributes, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), future); + saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, newAttributes, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), callback); return; } @@ -469,12 +452,12 @@ public abstract class AbstractCalculatedFieldProcessingService { existingAttributes -> { List changed = filterChangedAttr(existingAttributes, newAttributes); if (changed.isEmpty()) { - future.set(null); + callback.onSuccess(); return; } - saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, changed, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), future); + saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, changed, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), callback); }, - future::setException, + callback::onFailure, MoreExecutors.directExecutor()); } } @@ -486,10 +469,7 @@ public abstract class AbstractCalculatedFieldProcessingService { List entries, AttributesSaveRequest.Strategy strategy, boolean sendAttributesUpdatedNotification, - SettableFuture future) { - Runnable onSuccess = sendAttributesUpdatedNotification - ? () -> sendAttributesUpdatedMsg(tenantId, entityId, scope, cfName, entries) - : null; + TbCallback callback) { tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) @@ -497,23 +477,36 @@ public abstract class AbstractCalculatedFieldProcessingService { .entries(entries) .strategy(strategy) .previousCalculatedFieldIds(cfIds) - .callback(wrapWithSuccessHandler(future, onSuccess)) + .callback(new FutureCallback() { + @Override + public void onSuccess(Void result) { + if (sendAttributesUpdatedNotification) { + sendAttributesUpdatedMsg(tenantId, entityId, scope, cfName, entries); + } + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }) .build()); } - private void saveTimeSeries(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, List cfIds, long ts, SettableFuture future) { + private void saveTimeSeries(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, List cfIds, long ts, TbCallback callback) { if (!(outputStrategy instanceof TimeSeriesImmediateOutputStrategy tsOutputStrategy)) { - future.setException(new IllegalArgumentException("Only TimeSeriesImmediateOutputStrategy is supported.")); + callback.onFailure(new IllegalArgumentException("Only TimeSeriesImmediateOutputStrategy is supported.")); } else { TimeseriesSaveRequest.Strategy strategy = new TimeseriesSaveRequest.Strategy(tsOutputStrategy.isSaveTimeSeries(), tsOutputStrategy.isSaveLatest(), tsOutputStrategy.isSendWsUpdate(), tsOutputStrategy.isProcessCfs()); - saveTimeSeriesInternal(tenantId, entityId, jsonResult, tsOutputStrategy.getTtl(), cfIds, ts, strategy, future); + saveTimeSeriesInternal(tenantId, entityId, jsonResult, tsOutputStrategy.getTtl(), cfIds, ts, strategy, callback); } } - private void saveTimeSeriesInternal(TenantId tenantId, EntityId entityId, JsonElement jsonResult, Long ttl, List cfIds, long ts, TimeseriesSaveRequest.Strategy strategy, SettableFuture future) { + private void saveTimeSeriesInternal(TenantId tenantId, EntityId entityId, JsonElement jsonResult, Long ttl, List cfIds, long ts, TimeseriesSaveRequest.Strategy strategy, TbCallback callback) { Map> tsKvMap = JsonConverter.convertToTelemetry(jsonResult, ts); if (tsKvMap.isEmpty()) { - future.set(null); + callback.onSuccess(); return; } List tsEntries = toTsKvEntryList(tsKvMap); @@ -522,7 +515,17 @@ public abstract class AbstractCalculatedFieldProcessingService { .entityId(entityId) .entries(tsEntries) .strategy(strategy) - .future(future); + .callback(new FutureCallback() { + @Override + public void onSuccess(Void result) { + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); if (ttl != null) { builder.ttl(ttl); } @@ -554,21 +557,4 @@ public abstract class AbstractCalculatedFieldProcessingService { sendMsgToRuleEngine(tenantId, entityId, TbCallback.EMPTY, attributesUpdatedMsg); } - private FutureCallback wrapWithSuccessHandler(SettableFuture future, Runnable onSuccess) { - return new FutureCallback<>() { - @Override - public void onSuccess(Void result) { - future.set(result); - if (onSuccess != null) { - onSuccess.run(); - } - } - - @Override - public void onFailure(Throwable t) { - future.setException(t); - } - }; - } - }