diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql index eccc510f42..84c6a53f0d 100644 --- a/application/src/main/data/upgrade/basic/schema_update.sql +++ b/application/src/main/data/upgrade/basic/schema_update.sql @@ -31,9 +31,10 @@ DO $$ || jsonb_build_object( 'processingSettings', jsonb_build_object( 'type', 'ADVANCED', - 'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'), - 'latest', jsonb_build_object('type', 'SKIP'), - 'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE') + 'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'), + 'latest', jsonb_build_object('type', 'SKIP'), + 'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE'), + 'calculatedFields', jsonb_build_object('type', 'ON_EVERY_MESSAGE') ) ) )::text, diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 3a351528e4..6b32083bb7 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -15,13 +15,11 @@ */ package org.thingsboard.server.service.apiusage; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; @@ -92,15 +90,7 @@ import java.util.stream.Collectors; public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService implements TbApiUsageStateService { public static final String HOURLY = "Hourly"; - public static final FutureCallback VOID_CALLBACK = new FutureCallback() { - @Override - public void onSuccess(@Nullable Void result) { - } - @Override - public void onFailure(Throwable t) { - } - }; private final PartitionService partitionService; private final TenantService tenantService; private final TimeseriesService tsService; @@ -219,7 +209,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService .tenantId(tenantId) .entityId(usageState.getApiUsageState().getId()) .entries(updatedEntries) - .callback(VOID_CALLBACK) .build()); if (!result.isEmpty()) { persistAndNotify(usageState, result); @@ -331,7 +320,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService .tenantId(tenantId) .entityId(id) .entries(profileThresholds) - .callback(VOID_CALLBACK) .build()); } } @@ -363,7 +351,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService .tenantId(state.getTenantId()) .entityId(state.getApiUsageState().getId()) .entries(stateTelemetry) - .callback(VOID_CALLBACK) .build()); if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { @@ -456,7 +443,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService .tenantId(state.getTenantId()) .entityId(state.getApiUsageState().getId()) .entries(counts) - .callback(VOID_CALLBACK) .build()); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 79222ca002..660a9faf1d 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -149,9 +149,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } else { resultFuture = Futures.immediateFuture(TimeseriesSaveResult.EMPTY); } - DonAsynchron.withCallback(resultFuture, result -> { - calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); - }, safeCallback(request.getCallback()), tsCallBackExecutor); + + addMainCallback(resultFuture, result -> { + if (strategy.processCalculatedFields()) { + calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); + } else { + request.getCallback().onSuccess(null); + } + }, t -> request.getCallback().onFailure(t)); + if (strategy.sendWsUpdate()) { addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); } diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 693fce82b4..86e0550950 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.telemetry; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import org.checkerframework.checker.nullness.qual.NonNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,6 +75,7 @@ import java.util.stream.Stream; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.lenient; @@ -101,14 +101,6 @@ class DefaultTelemetrySubscriptionServiceTest { .myPartition(true) .build(); - final FutureCallback emptyCallback = new FutureCallback<>() { - @Override - public void onSuccess(Void result) {} - - @Override - public void onFailure(@NonNull Throwable t) {} - }; - ExecutorService wsCallBackExecutor; ExecutorService tsCallBackExecutor; @@ -185,8 +177,7 @@ class DefaultTelemetrySubscriptionServiceTest { .entityId(entityId) .entries(sampleTelemetry) .ttl(sampleTtl) - .strategy(new TimeseriesSaveRequest.Strategy(true, false, false)) - .callback(emptyCallback) + .strategy(new TimeseriesSaveRequest.Strategy(true, false, false, false)) .build(); // WHEN @@ -206,7 +197,6 @@ class DefaultTelemetrySubscriptionServiceTest { .entries(sampleTelemetry) .ttl(sampleTtl) .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) - .callback(emptyCallback) .build(); // WHEN @@ -228,7 +218,7 @@ class DefaultTelemetrySubscriptionServiceTest { .entityId(entityId) .entries(sampleTelemetry) .ttl(sampleTtl) - .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) .future(future) .build(); @@ -287,8 +277,7 @@ class DefaultTelemetrySubscriptionServiceTest { .entityId(entityId) .entries(sampleTelemetry) .ttl(sampleTtl) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) - .callback(emptyCallback) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) .build(); // WHEN @@ -314,8 +303,7 @@ class DefaultTelemetrySubscriptionServiceTest { .entityId(entityId) .entries(sampleTelemetry) .ttl(sampleTtl) - .strategy(new TimeseriesSaveRequest.Strategy(true, false, false)) - .callback(emptyCallback) + .strategy(new TimeseriesSaveRequest.Strategy(true, false, false, false)) .build(); // WHEN @@ -332,7 +320,7 @@ class DefaultTelemetrySubscriptionServiceTest { @ParameterizedTest @MethodSource("booleanCombinations") - void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { + void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) { // GIVEN var request = TimeseriesSaveRequest.builder() .tenantId(tenantId) @@ -340,8 +328,7 @@ class DefaultTelemetrySubscriptionServiceTest { .entityId(entityId) .entries(sampleTelemetry) .ttl(sampleTtl) - .strategy(new TimeseriesSaveRequest.Strategy(saveTimeseries, saveLatest, sendWsUpdate)) - .callback(emptyCallback) + .strategy(new TimeseriesSaveRequest.Strategy(saveTimeseries, saveLatest, sendWsUpdate, processCalculatedFields)) .build(); // WHEN @@ -355,6 +342,11 @@ class DefaultTelemetrySubscriptionServiceTest { } else if (saveTimeseries) { then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl); } + + if (processCalculatedFields) { + then(calculatedFieldQueueService).should().pushRequestToQueue(eq(request), any(), eq(request.getCallback())); + } + then(tsService).shouldHaveNoMoreInteractions(); if (sendWsUpdate) { @@ -366,18 +358,26 @@ class DefaultTelemetrySubscriptionServiceTest { private static Stream booleanCombinations() { return Stream.of( - Arguments.of(true, true, true), - Arguments.of(true, true, false), - Arguments.of(true, false, true), - Arguments.of(true, false, false), - Arguments.of(false, true, true), - Arguments.of(false, true, false), - Arguments.of(false, false, true), - Arguments.of(false, false, false) + Arguments.of(true, true, true, true), + Arguments.of(true, true, true, false), + Arguments.of(true, true, false, true), + Arguments.of(true, true, false, false), + Arguments.of(true, false, true, true), + Arguments.of(true, false, true, false), + Arguments.of(true, false, false, true), + Arguments.of(true, false, false, false), + Arguments.of(false, true, true, true), + Arguments.of(false, true, true, false), + Arguments.of(false, true, false, true), + Arguments.of(false, true, false, false), + Arguments.of(false, false, true, true), + Arguments.of(false, false, true, false), + Arguments.of(false, false, false, true), + Arguments.of(false, false, false, false) ); } - // used to emulate sequence numbers returned by save latest API + // used to emulate versions returned by save latest API private static List listOfNNumbers(int N) { return LongStream.range(0, N).boxed().toList(); } diff --git a/common/util/src/main/java/org/thingsboard/common/util/NoOpFutureCallback.java b/common/util/src/main/java/org/thingsboard/common/util/NoOpFutureCallback.java new file mode 100644 index 0000000000..2927477849 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/NoOpFutureCallback.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.google.common.util.concurrent.FutureCallback; + +public enum NoOpFutureCallback implements FutureCallback { + + INSTANCE; + + @Override + public void onSuccess(Object result) {} + + @Override + public void onFailure(Throwable t) {} + + @SuppressWarnings("unchecked") + public static FutureCallback instance() { + return (FutureCallback) INSTANCE; + } + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index e3c04cae8b..02b6c42ea4 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.SettableFuture; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; +import org.thingsboard.common.util.NoOpFutureCallback; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; @@ -32,6 +33,8 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; import java.util.UUID; +import static java.util.Objects.requireNonNullElse; + @Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) public class TimeseriesSaveRequest { @@ -47,12 +50,12 @@ public class TimeseriesSaveRequest { private final TbMsgType tbMsgType; private final FutureCallback callback; - public record Strategy(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { + public record Strategy(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) { - public static final Strategy SAVE_ALL = new Strategy(true, true, true); - public static final Strategy WS_ONLY = new Strategy(false, false, true); - public static final Strategy LATEST_AND_WS = new Strategy(false, true, true); - public static final Strategy SKIP_ALL = new Strategy(false, false, false); + public static final Strategy PROCESS_ALL = new Strategy(true, true, true, true); + public static final Strategy WS_ONLY = new Strategy(false, false, true, false); + public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false); + public static final Strategy SKIP_ALL = new Strategy(false, false, false, false); } @@ -67,7 +70,7 @@ public class TimeseriesSaveRequest { private EntityId entityId; private List entries; private long ttl; - private Strategy strategy = Strategy.SAVE_ALL; + private Strategy strategy = Strategy.PROCESS_ALL; private List previousCalculatedFieldIds; private UUID tbMsgId; private TbMsgType tbMsgType; @@ -148,7 +151,10 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, strategy, previousCalculatedFieldIds, tbMsgId, tbMsgType, callback); + return new TimeseriesSaveRequest( + tenantId, customerId, entityId, entries, ttl, strategy, + previousCalculatedFieldIds, tbMsgId, tbMsgType, requireNonNullElse(callback, NoOpFutureCallback.instance()) + ); } } diff --git a/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java b/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java index 321892991e..c3ed4014cc 100644 --- a/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java +++ b/rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java @@ -16,36 +16,51 @@ package org.thingsboard.rule.engine.api; import org.junit.jupiter.api.Test; +import org.thingsboard.common.util.NoOpFutureCallback; import static org.assertj.core.api.Assertions.assertThat; class TimeseriesSaveRequestTest { @Test - void testDefaultSaveStrategyIsSaveAll() { + void testDefaultSaveStrategyIsProcessAll() { var request = TimeseriesSaveRequest.builder().build(); - assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL); } @Test void testSaveAllStrategy() { - assertThat(TimeseriesSaveRequest.Strategy.SAVE_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, true)); + assertThat(TimeseriesSaveRequest.Strategy.PROCESS_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, true, true)); } @Test void testWsOnlyStrategy() { - assertThat(TimeseriesSaveRequest.Strategy.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, true)); + assertThat(TimeseriesSaveRequest.Strategy.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, true, false)); } @Test void testLatestAndWsStrategy() { - assertThat(TimeseriesSaveRequest.Strategy.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.Strategy(false, true, true)); + assertThat(TimeseriesSaveRequest.Strategy.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.Strategy(false, true, true, false)); } @Test void testSkipAllStrategy() { - assertThat(TimeseriesSaveRequest.Strategy.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, false)); + assertThat(TimeseriesSaveRequest.Strategy.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, false, false)); + } + + @Test + void testDefaultCallbackIsNoOp() { + var request = TimeseriesSaveRequest.builder().build(); + + assertThat(request.getCallback()).isEqualTo(NoOpFutureCallback.instance()); + } + + @Test + void testNullCallbackIsNoOp() { + var request = TimeseriesSaveRequest.builder().callback(null).build(); + + assertThat(request.getCallback()).isEqualTo(NoOpFutureCallback.instance()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index c877f8b9a4..569f2561cc 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -66,6 +66,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
  • Time series: save time series data to a ts_kv table in a DB.
  • Latest values: save time series data to a ts_kv_latest table in a DB.
  • WebSockets: notify WebSockets subscriptions about time series data updates.
  • +
  • Calculated fields: notify calculated fields about time series data updates.
  • For each action, three processing strategies are available: @@ -90,7 +91,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE By default, the timestamp is taken from metadata.ts. You can enable Use server timestamp to always use the current server time instead. This is particularly useful in sequential processing scenarios where messages may arrive with out-of-order timestamps from - multiple sources. Note that the DB layer may ignore older records for attributes and latest values, + multiple sources. Note that the DB layer may ignore "outdated" records for attributes and latest values, so enabling Use server timestamp can ensure correct ordering.

    The TTL is taken first from metadata.TTL. If absent, the node configuration’s default @@ -137,7 +138,7 @@ public class TbMsgTimeseriesNode implements TbNode { TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId()); // short-circuit - if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) { + if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate() && !strategy.processCalculatedFields()) { ctx.tellSuccess(msg); return; } @@ -179,20 +180,21 @@ public class TbMsgTimeseriesNode implements TbNode { private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) { if (processingSettings instanceof OnEveryMessage) { - return TimeseriesSaveRequest.Strategy.SAVE_ALL; + return TimeseriesSaveRequest.Strategy.PROCESS_ALL; } if (processingSettings instanceof WebSocketsOnly) { return TimeseriesSaveRequest.Strategy.WS_ONLY; } if (processingSettings instanceof Deduplicate deduplicate) { boolean isFirstMsgInInterval = deduplicate.getProcessingStrategy().shouldProcess(ts, originatorUuid); - return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.SAVE_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL; + return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.PROCESS_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL; } if (processingSettings instanceof Advanced advanced) { return new TimeseriesSaveRequest.Strategy( advanced.timeseries().shouldProcess(ts, originatorUuid), advanced.latest().shouldProcess(ts, originatorUuid), - advanced.webSockets().shouldProcess(ts, originatorUuid) + advanced.webSockets().shouldProcess(ts, originatorUuid), + advanced.calculatedFields().shouldProcess(ts, originatorUuid) ); } // should not happen @@ -215,6 +217,7 @@ public class TbMsgTimeseriesNode implements TbNode { var skipLatestProcessingSettings = new Advanced( ProcessingStrategy.onEveryMessage(), ProcessingStrategy.skip(), + ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage() ); ((ObjectNode) oldConfiguration).set("processingSettings", JacksonUtil.valueToTree(skipLatestProcessingSettings)); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index 88524e3751..fe2afd8a15 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -83,12 +83,18 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration { assertThat(request.getEntries()).size().isOne(); - assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL); })); TbMsg resultMsg = msgCaptor.getValue(); @@ -569,7 +569,7 @@ public class TbMathNodeTest { verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { assertThat(request.getEntries()).size().isOne(); - assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL); })); TbMsg resultMsg = msgCaptor.getValue(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index 976536c203..0cb6c7d50c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -208,7 +208,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList); assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile)); - assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); verify(ctxMock).tellSuccess(msg); @@ -223,7 +223,8 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { var timeseriesStrategy = ProcessingStrategy.onEveryMessage(); var latestStrategy = ProcessingStrategy.skip(); var webSockets = ProcessingStrategy.onEveryMessage(); - var processingSettings = new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets); + var calculatedFields = ProcessingStrategy.onEveryMessage(); + var processingSettings = new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets, calculatedFields); config.setProcessingSettings(processingSettings); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); @@ -265,7 +266,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntries()).containsExactlyElementsOf(expectedList); assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL()); - assertThat(request.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, true)); + assertThat(request.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, true, true)); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); verify(ctxMock).tellSuccess(msg); @@ -304,7 +305,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getTtl()).isEqualTo(expectedTtl); - assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); + assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); } @@ -353,7 +354,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { .entityId(msg.getOriginator()) .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) .ttl(extractTtlAsSeconds(tenantProfile)) - .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) @@ -391,7 +392,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { .entityId(msg.getOriginator()) .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) .ttl(extractTtlAsSeconds(tenantProfile)) - .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) @@ -450,6 +451,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { public void givenAdvancedProcessingSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException { // GIVEN config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced( + ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage() @@ -471,10 +473,11 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { .entityId(msg.getOriginator()) .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) .ttl(extractTtlAsSeconds(tenantProfile)) - .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctxMock, msg)) .build(); node.onMsg(ctxMock, msg); @@ -494,7 +497,8 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced( ProcessingStrategy.deduplicate(1), ProcessingStrategy.deduplicate(2), - ProcessingStrategy.deduplicate(3) + ProcessingStrategy.deduplicate(3), + ProcessingStrategy.deduplicate(4) )); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); @@ -502,6 +506,8 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { long ts1 = 500L; long ts2 = 1500L; long ts3 = 2500L; + long ts4 = 3500L; + long ts5 = 4500L; // WHEN-THEN node.onMsg(ctxMock, TbMsg.newMsg() @@ -511,7 +517,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1)))) .build()); then(telemetryServiceMock).should().saveTimeseries(assertArg( - actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL) + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL) )); clearInvocations(telemetryServiceMock); @@ -523,7 +529,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2)))) .build()); then(telemetryServiceMock).should().saveTimeseries(assertArg( - actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, false)) + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo( + new TimeseriesSaveRequest.Strategy(true, false, false, false) + ) )); clearInvocations(telemetryServiceMock); @@ -535,7 +543,37 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3)))) .build()); then(telemetryServiceMock).should().saveTimeseries(assertArg( - actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, false)) + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo( + new TimeseriesSaveRequest.Strategy(true, true, false, false) + ) + )); + + clearInvocations(telemetryServiceMock); + + node.onMsg(ctxMock, TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts4)))) + .build()); + then(telemetryServiceMock).should().saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo( + new TimeseriesSaveRequest.Strategy(true, false, true, false) + ) + )); + + clearInvocations(telemetryServiceMock); + + node.onMsg(ctxMock, TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(DEVICE_ID) + .data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString()) + .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts5)))) + .build()); + then(telemetryServiceMock).should().saveTimeseries(assertArg( + actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo( + new TimeseriesSaveRequest.Strategy(true, true, false, true) + ) )); } @@ -543,6 +581,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { public void givenAdvancedProcessingSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException { // GIVEN config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced( + ProcessingStrategy.skip(), ProcessingStrategy.skip(), ProcessingStrategy.skip(), ProcessingStrategy.skip() @@ -643,6 +682,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest { }, "webSockets": { "type": "ON_EVERY_MESSAGE" + }, + "calculatedFields": { + "type": "ON_EVERY_MESSAGE" } } }""") diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts index e71055b5da..24ae289de8 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts @@ -43,11 +43,11 @@ import { MathFunctionConfigComponent } from './math-function-config.component'; import { DeviceStateConfigComponent } from './device-state-config.component'; import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply-config.component'; import { - AdvancedPersistenceSettingComponent -} from '@home/components/rule-node/action/advanced-persistence-setting.component'; + AdvancedProcessingSettingComponent +} from '@home/components/rule-node/action/advanced-processing-setting.component'; import { - AdvancedPersistenceSettingRowComponent -} from '@home/components/rule-node/action/advanced-persistence-setting-row.component'; + AdvancedProcessingSettingRowComponent +} from '@home/components/rule-node/action/advanced-processing-setting-row.component'; @NgModule({ declarations: [ @@ -74,8 +74,8 @@ import { PushToCloudConfigComponent, MathFunctionConfigComponent, DeviceStateConfigComponent, - AdvancedPersistenceSettingComponent, - AdvancedPersistenceSettingRowComponent, + AdvancedProcessingSettingComponent, + AdvancedProcessingSettingRowComponent, ], imports: [ CommonModule, diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.html similarity index 80% rename from ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html rename to ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.html index 2bb51ad504..2f67480e29 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.html @@ -15,17 +15,17 @@ limitations under the License. --> -
    +
    {{ title }}
    rule-node-config.save-time-series.strategy - @for (strategy of persistenceStrategies; track strategy) { - {{ PersistenceTypeTranslationMap.get(strategy) | translate }} + @for (strategy of processingStrategies; track strategy) { + {{ ProcessingTypeTranslationMap.get(strategy) | translate }} } - @if(persistenceSettingRowForm.get('type').value === PersistenceType.DEDUPLICATE) { + @if (processingSettingRowForm.get('type').value === ProcessingType.DEDUPLICATE) { AdvancedPersistenceSettingRowComponent), + useExisting: forwardRef(() => AdvancedProcessingSettingRowComponent), multi: true },{ provide: NG_VALIDATORS, - useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent), + useExisting: forwardRef(() => AdvancedProcessingSettingRowComponent), multi: true }] }) -export class AdvancedPersistenceSettingRowComponent implements ControlValueAccessor, Validator { +export class AdvancedProcessingSettingRowComponent implements ControlValueAccessor, Validator { @Input() title: string; - persistenceSettingRowForm = this.fb.group({ + processingSettingRowForm = this.fb.group({ type: [defaultAdvancedProcessingConfig.type], deduplicationIntervalSecs: [{value: 60, disabled: true}] }); - PersistenceType = ProcessingType; - persistenceStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.SKIP]; - PersistenceTypeTranslationMap = ProcessingTypeTranslationMap; + ProcessingType = ProcessingType; + processingStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.SKIP]; + ProcessingTypeTranslationMap = ProcessingTypeTranslationMap; maxDeduplicateTime = maxDeduplicateTimeSecs; private propagateChange: (value: any) => void = () => {}; constructor(private fb: FormBuilder) { - this.persistenceSettingRowForm.get('type').valueChanges.pipe( + this.processingSettingRowForm.get('type').valueChanges.pipe( takeUntilDestroyed() ).subscribe(() => this.updatedValidation()); - this.persistenceSettingRowForm.valueChanges.pipe( + this.processingSettingRowForm.valueChanges.pipe( takeUntilDestroyed() ).subscribe((value) => this.propagateChange(value)); } @@ -83,32 +83,32 @@ export class AdvancedPersistenceSettingRowComponent implements ControlValueAcces setDisabledState(isDisabled: boolean) { if (isDisabled) { - this.persistenceSettingRowForm.disable({emitEvent: false}); + this.processingSettingRowForm.disable({emitEvent: false}); } else { - this.persistenceSettingRowForm.enable({emitEvent: false}); + this.processingSettingRowForm.enable({emitEvent: false}); this.updatedValidation(); } } validate(): ValidationErrors | null { - return this.persistenceSettingRowForm.valid ? null : { - persistenceSettingRow: false + return this.processingSettingRowForm.valid ? null : { + processingSettingRow: false }; } writeValue(value: AdvancedProcessingConfig) { if (isDefinedAndNotNull(value)) { - this.persistenceSettingRowForm.patchValue(value, {emitEvent: false}); + this.processingSettingRowForm.patchValue(value, {emitEvent: false}); } else { - this.persistenceSettingRowForm.patchValue(defaultAdvancedProcessingConfig); + this.processingSettingRowForm.patchValue(defaultAdvancedProcessingConfig); } } private updatedValidation() { - if (this.persistenceSettingRowForm.get('type').value === ProcessingType.DEDUPLICATE) { - this.persistenceSettingRowForm.get('deduplicationIntervalSecs').enable({emitEvent: false}); + if (this.processingSettingRowForm.get('type').value === ProcessingType.DEDUPLICATE) { + this.processingSettingRowForm.get('deduplicationIntervalSecs').enable({emitEvent: false}); } else { - this.persistenceSettingRowForm.get('deduplicationIntervalSecs').disable({emitEvent: false}) + this.processingSettingRowForm.get('deduplicationIntervalSecs').disable({emitEvent: false}) } } } diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.html similarity index 68% rename from ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html rename to ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.html index d3cbb1cdcb..9efae572ec 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.html @@ -15,22 +15,26 @@ limitations under the License. --> -
    +
    - - + - + + > +
    diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.ts similarity index 69% rename from ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts rename to ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.ts index 969f432f5f..9246f65f6d 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.ts @@ -27,30 +27,31 @@ import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { AdvancedProcessingStrategy } from '@home/components/rule-node/action/timeseries-config.models'; @Component({ - selector: 'tb-advanced-persistence-settings', - templateUrl: './advanced-persistence-setting.component.html', + selector: 'tb-advanced-processing-settings', + templateUrl: './advanced-processing-setting.component.html', providers: [{ provide: NG_VALUE_ACCESSOR, - useExisting: forwardRef(() => AdvancedPersistenceSettingComponent), + useExisting: forwardRef(() => AdvancedProcessingSettingComponent), multi: true },{ provide: NG_VALIDATORS, - useExisting: forwardRef(() => AdvancedPersistenceSettingComponent), + useExisting: forwardRef(() => AdvancedProcessingSettingComponent), multi: true }] }) -export class AdvancedPersistenceSettingComponent implements ControlValueAccessor, Validator { +export class AdvancedProcessingSettingComponent implements ControlValueAccessor, Validator { - persistenceForm = this.fb.group({ + processingForm = this.fb.group({ timeseries: [null], latest: [null], - webSockets: [null] + webSockets: [null], + calculatedFields: [null] }); private propagateChange: (value: any) => void = () => {}; constructor(private fb: FormBuilder) { - this.persistenceForm.valueChanges.pipe( + this.processingForm.valueChanges.pipe( takeUntilDestroyed() ).subscribe(value => this.propagateChange(value)); } @@ -64,19 +65,19 @@ export class AdvancedPersistenceSettingComponent implements ControlValueAccessor setDisabledState(isDisabled: boolean) { if (isDisabled) { - this.persistenceForm.disable({emitEvent: false}); + this.processingForm.disable({emitEvent: false}); } else { - this.persistenceForm.enable({emitEvent: false}); + this.processingForm.enable({emitEvent: false}); } } validate(): ValidationErrors | null { - return this.persistenceForm.valid ? null : { - persistenceForm: false + return this.processingForm.valid ? null : { + processingForm: false }; } writeValue(value: AdvancedProcessingStrategy) { - this.persistenceForm.patchValue(value, {emitEvent: false}); + this.processingForm.patchValue(value, {emitEvent: false}); } } diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html index 715776d780..3f0aa84f9a 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html @@ -31,13 +31,13 @@ rule-node-config.save-time-series.strategy - @for (strategy of persistenceStrategies; track strategy) { - {{ PersistenceTypeTranslationMap.get(strategy) | translate }} + @for (strategy of processingStrategies; track strategy) { + {{ ProcessingTypeTranslationMap.get(strategy) | translate }} } - @if(timeseriesConfigForm.get('processingSettings.type').value === PersistenceType.DEDUPLICATE) { + @if(timeseriesConfigForm.get('processingSettings.type').value === ProcessingType.DEDUPLICATE) { } } @else { - + > }
    diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts index d5e1d998a8..ac14fc65b5 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts @@ -18,7 +18,7 @@ import { Component } from '@angular/core'; import { FormBuilder, FormGroup, Validators } from '@angular/forms'; import { RuleNodeConfigurationComponent } from '@shared/models/rule-node.models'; import { - defaultAdvancedPersistenceStrategy, + defaultAdvancedProcessingStrategy, maxDeduplicateTimeSecs, ProcessingSettings, ProcessingSettingsForm, @@ -37,9 +37,9 @@ export class TimeseriesConfigComponent extends RuleNodeConfigurationComponent { timeseriesConfigForm: FormGroup; - PersistenceType = ProcessingType; - persistenceStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.WEBSOCKETS_ONLY]; - PersistenceTypeTranslationMap = ProcessingTypeTranslationMap; + ProcessingType = ProcessingType; + processingStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.WEBSOCKETS_ONLY]; + ProcessingTypeTranslationMap = ProcessingTypeTranslationMap; maxDeduplicateTime = maxDeduplicateTimeSecs @@ -63,14 +63,14 @@ export class TimeseriesConfigComponent extends RuleNodeConfigurationComponent { type: isAdvanced ? ProcessingType.ON_EVERY_MESSAGE : config.processingSettings.type, isAdvanced: isAdvanced, deduplicationIntervalSecs: config.processingSettings?.deduplicationIntervalSecs ?? 60, - advanced: isAdvanced ? config.processingSettings : defaultAdvancedPersistenceStrategy + advanced: isAdvanced ? config.processingSettings : defaultAdvancedProcessingStrategy } } else { processingSettings = { type: ProcessingType.ON_EVERY_MESSAGE, isAdvanced: false, deduplicationIntervalSecs: 60, - advanced: defaultAdvancedPersistenceStrategy + advanced: defaultAdvancedProcessingStrategy }; } return { diff --git a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts index e9d024663d..2899b6ee9d 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts @@ -55,14 +55,15 @@ export interface BasicProcessingSettings { type: ProcessingType; } -export interface DeduplicateProcessingStrategy extends BasicProcessingSettings{ +export interface DeduplicateProcessingStrategy extends BasicProcessingSettings { deduplicationIntervalSecs: number; } -export interface AdvancedProcessingStrategy extends BasicProcessingSettings{ +export interface AdvancedProcessingStrategy extends BasicProcessingSettings { timeseries: AdvancedProcessingConfig; latest: AdvancedProcessingConfig; webSockets: AdvancedProcessingConfig; + calculatedFields: AdvancedProcessingConfig; } export type AdvancedProcessingConfig = WithOptional; @@ -71,8 +72,9 @@ export const defaultAdvancedProcessingConfig: AdvancedProcessingConfig = { type: ProcessingType.ON_EVERY_MESSAGE } -export const defaultAdvancedPersistenceStrategy: Omit = { +export const defaultAdvancedProcessingStrategy: Omit = { timeseries: defaultAdvancedProcessingConfig, latest: defaultAdvancedProcessingConfig, webSockets: defaultAdvancedProcessingConfig, + calculatedFields: defaultAdvancedProcessingConfig, } diff --git a/ui-ngx/src/assets/help/en_US/rulenode/save_timeseries_node_advanced.md b/ui-ngx/src/assets/help/en_US/rulenode/save_timeseries_node_advanced.md index 44a037f180..18c0f60cdb 100644 --- a/ui-ngx/src/assets/help/en_US/rulenode/save_timeseries_node_advanced.md +++ b/ui-ngx/src/assets/help/en_US/rulenode/save_timeseries_node_advanced.md @@ -2,23 +2,28 @@ When configuring the processing strategies, certain combinations can lead to unexpected behavior. Consider the following scenarios: +- **Skipping database storage** + + Choosing to disable one or more persistence actions (for instance, skipping database storage for Time series or Latest values while keeping WS updates enabled) introduces the risk of having only partial data available: + - If a message is processed only for real-time notifications (WebSockets) and not stored in the database, historical queries may not match data on the dashboard. + - When processing strategies for Time series and Latest values are out-of-sync, telemetry data may be stored in one table (e.g., Time series) while the same data is absent in the other (e.g., Latest values). + - **Disabling WebSocket (WS) updates** If WS updates are disabled, any changes to the time series data won’t be pushed to dashboards (or other WS subscriptions). This means that even if a database is updated, dashboards may not display the updated data until browser page is reloaded. +- **Skipping calculated field recalculation** + + If telemetry data is saved to the database while bypassing calculated field recalculation, the aggregated value may not update to reflect the latest data. + Conversely, if the calculated field is recalculated with new data but the corresponding telemetry value is not persisted in the database, the calculated field's value might include data that isn’t stored. + - **Different deduplication intervals across actions** When you configure different deduplication intervals for actions, the same incoming message might be processed differently for each action. For example, a message might be stored immediately in the Time series table (if set to *On every message*) while not being stored in the Latest values table because its deduplication interval hasn’t elapsed. Also, if the WebSocket updates are configured with a different interval, dashboards might show updates that do not match what is stored in the database. -- **Skipping database storage** - - Choosing to disable one or more persistence actions (for instance, skipping database storage for Time series or Latest values while keeping WS updates enabled) introduces the risk of having only partial data available: - - If a message is processed only for real-time notifications (WebSockets) and not stored in the database, historical queries may not match data on the dashboard. - - When processing strategies for Time series and Latest values are out-of-sync, telemetry data may be stored in one table (e.g., Time series) while the same data is absent in the other (e.g., Latest values). - - **Deduplication cache clearing** The deduplication mechanism uses a cache to track processed messages within each interval. diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index cf6e3ba86e..9aff79102b 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -5250,7 +5250,8 @@ }, "time-series": "Time series", "latest": "Latest values", - "web-sockets": "WebSockets" + "web-sockets": "WebSockets", + "calculated-fields": "Calculated fields" }, "key-val": { "key": "Key",