Browse Source

Save time series strategies: add support for CFs

pull/12744/head
Dmytro Skarzhynets 1 year ago
parent
commit
4ebfbd5c3e
No known key found for this signature in database GPG Key ID: 2B51652F224037DF
  1. 7
      application/src/main/data/upgrade/basic/schema_update.sql
  2. 14
      application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java
  3. 12
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  4. 58
      application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java
  5. 35
      common/util/src/main/java/org/thingsboard/common/util/NoOpFutureCallback.java
  6. 20
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java
  7. 27
      rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java
  8. 13
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
  9. 8
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java
  10. 4
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java
  11. 64
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java
  12. 12
      ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts
  13. 8
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.html
  14. 40
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.ts
  15. 18
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.html
  16. 27
      ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.ts
  17. 10
      ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html
  18. 12
      ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts
  19. 8
      ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts
  20. 17
      ui-ngx/src/assets/help/en_US/rulenode/save_timeseries_node_advanced.md
  21. 3
      ui-ngx/src/assets/locale/locale.constant-en_US.json

7
application/src/main/data/upgrade/basic/schema_update.sql

@ -31,9 +31,10 @@ DO $$
|| jsonb_build_object( || jsonb_build_object(
'processingSettings', jsonb_build_object( 'processingSettings', jsonb_build_object(
'type', 'ADVANCED', 'type', 'ADVANCED',
'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'), 'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
'latest', jsonb_build_object('type', 'SKIP'), 'latest', jsonb_build_object('type', 'SKIP'),
'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE') 'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
'calculatedFields', jsonb_build_object('type', 'ON_EVERY_MESSAGE')
) )
) )
)::text, )::text,

14
application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java

@ -15,13 +15,11 @@
*/ */
package org.thingsboard.server.service.apiusage; package org.thingsboard.server.service.apiusage;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
@ -92,15 +90,7 @@ import java.util.stream.Collectors;
public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService<EntityId> implements TbApiUsageStateService { public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService<EntityId> implements TbApiUsageStateService {
public static final String HOURLY = "Hourly"; public static final String HOURLY = "Hourly";
public static final FutureCallback<Void> VOID_CALLBACK = new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
}
@Override
public void onFailure(Throwable t) {
}
};
private final PartitionService partitionService; private final PartitionService partitionService;
private final TenantService tenantService; private final TenantService tenantService;
private final TimeseriesService tsService; private final TimeseriesService tsService;
@ -219,7 +209,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
.tenantId(tenantId) .tenantId(tenantId)
.entityId(usageState.getApiUsageState().getId()) .entityId(usageState.getApiUsageState().getId())
.entries(updatedEntries) .entries(updatedEntries)
.callback(VOID_CALLBACK)
.build()); .build());
if (!result.isEmpty()) { if (!result.isEmpty()) {
persistAndNotify(usageState, result); persistAndNotify(usageState, result);
@ -331,7 +320,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
.tenantId(tenantId) .tenantId(tenantId)
.entityId(id) .entityId(id)
.entries(profileThresholds) .entries(profileThresholds)
.callback(VOID_CALLBACK)
.build()); .build());
} }
} }
@ -363,7 +351,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
.tenantId(state.getTenantId()) .tenantId(state.getTenantId())
.entityId(state.getApiUsageState().getId()) .entityId(state.getApiUsageState().getId())
.entries(stateTelemetry) .entries(stateTelemetry)
.callback(VOID_CALLBACK)
.build()); .build());
if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) {
@ -456,7 +443,6 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
.tenantId(state.getTenantId()) .tenantId(state.getTenantId())
.entityId(state.getApiUsageState().getId()) .entityId(state.getApiUsageState().getId())
.entries(counts) .entries(counts)
.callback(VOID_CALLBACK)
.build()); .build());
} }

12
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -149,9 +149,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
} else { } else {
resultFuture = Futures.immediateFuture(TimeseriesSaveResult.EMPTY); resultFuture = Futures.immediateFuture(TimeseriesSaveResult.EMPTY);
} }
DonAsynchron.withCallback(resultFuture, result -> {
calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); addMainCallback(resultFuture, result -> {
}, safeCallback(request.getCallback()), tsCallBackExecutor); if (strategy.processCalculatedFields()) {
calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback());
} else {
request.getCallback().onSuccess(null);
}
}, t -> request.getCallback().onFailure(t));
if (strategy.sendWsUpdate()) { if (strategy.sendWsUpdate()) {
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
} }

58
application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java

@ -18,7 +18,6 @@ package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -76,6 +75,7 @@ import java.util.stream.Stream;
import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -101,14 +101,6 @@ class DefaultTelemetrySubscriptionServiceTest {
.myPartition(true) .myPartition(true)
.build(); .build();
final FutureCallback<Void> emptyCallback = new FutureCallback<>() {
@Override
public void onSuccess(Void result) {}
@Override
public void onFailure(@NonNull Throwable t) {}
};
ExecutorService wsCallBackExecutor; ExecutorService wsCallBackExecutor;
ExecutorService tsCallBackExecutor; ExecutorService tsCallBackExecutor;
@ -185,8 +177,7 @@ class DefaultTelemetrySubscriptionServiceTest {
.entityId(entityId) .entityId(entityId)
.entries(sampleTelemetry) .entries(sampleTelemetry)
.ttl(sampleTtl) .ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(true, false, false)) .strategy(new TimeseriesSaveRequest.Strategy(true, false, false, false))
.callback(emptyCallback)
.build(); .build();
// WHEN // WHEN
@ -206,7 +197,6 @@ class DefaultTelemetrySubscriptionServiceTest {
.entries(sampleTelemetry) .entries(sampleTelemetry)
.ttl(sampleTtl) .ttl(sampleTtl)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(emptyCallback)
.build(); .build();
// WHEN // WHEN
@ -228,7 +218,7 @@ class DefaultTelemetrySubscriptionServiceTest {
.entityId(entityId) .entityId(entityId)
.entries(sampleTelemetry) .entries(sampleTelemetry)
.ttl(sampleTtl) .ttl(sampleTtl)
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
.future(future) .future(future)
.build(); .build();
@ -287,8 +277,7 @@ class DefaultTelemetrySubscriptionServiceTest {
.entityId(entityId) .entityId(entityId)
.entries(sampleTelemetry) .entries(sampleTelemetry)
.ttl(sampleTtl) .ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false))
.callback(emptyCallback)
.build(); .build();
// WHEN // WHEN
@ -314,8 +303,7 @@ class DefaultTelemetrySubscriptionServiceTest {
.entityId(entityId) .entityId(entityId)
.entries(sampleTelemetry) .entries(sampleTelemetry)
.ttl(sampleTtl) .ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(true, false, false)) .strategy(new TimeseriesSaveRequest.Strategy(true, false, false, false))
.callback(emptyCallback)
.build(); .build();
// WHEN // WHEN
@ -332,7 +320,7 @@ class DefaultTelemetrySubscriptionServiceTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("booleanCombinations") @MethodSource("booleanCombinations")
void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) {
// GIVEN // GIVEN
var request = TimeseriesSaveRequest.builder() var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId) .tenantId(tenantId)
@ -340,8 +328,7 @@ class DefaultTelemetrySubscriptionServiceTest {
.entityId(entityId) .entityId(entityId)
.entries(sampleTelemetry) .entries(sampleTelemetry)
.ttl(sampleTtl) .ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(saveTimeseries, saveLatest, sendWsUpdate)) .strategy(new TimeseriesSaveRequest.Strategy(saveTimeseries, saveLatest, sendWsUpdate, processCalculatedFields))
.callback(emptyCallback)
.build(); .build();
// WHEN // WHEN
@ -355,6 +342,11 @@ class DefaultTelemetrySubscriptionServiceTest {
} else if (saveTimeseries) { } else if (saveTimeseries) {
then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl); then(tsService).should().saveWithoutLatest(tenantId, entityId, sampleTelemetry, sampleTtl);
} }
if (processCalculatedFields) {
then(calculatedFieldQueueService).should().pushRequestToQueue(eq(request), any(), eq(request.getCallback()));
}
then(tsService).shouldHaveNoMoreInteractions(); then(tsService).shouldHaveNoMoreInteractions();
if (sendWsUpdate) { if (sendWsUpdate) {
@ -366,18 +358,26 @@ class DefaultTelemetrySubscriptionServiceTest {
private static Stream<Arguments> booleanCombinations() { private static Stream<Arguments> booleanCombinations() {
return Stream.of( return Stream.of(
Arguments.of(true, true, true), Arguments.of(true, true, true, true),
Arguments.of(true, true, false), Arguments.of(true, true, true, false),
Arguments.of(true, false, true), Arguments.of(true, true, false, true),
Arguments.of(true, false, false), Arguments.of(true, true, false, false),
Arguments.of(false, true, true), Arguments.of(true, false, true, true),
Arguments.of(false, true, false), Arguments.of(true, false, true, false),
Arguments.of(false, false, true), Arguments.of(true, false, false, true),
Arguments.of(false, false, false) Arguments.of(true, false, false, false),
Arguments.of(false, true, true, true),
Arguments.of(false, true, true, false),
Arguments.of(false, true, false, true),
Arguments.of(false, true, false, false),
Arguments.of(false, false, true, true),
Arguments.of(false, false, true, false),
Arguments.of(false, false, false, true),
Arguments.of(false, false, false, false)
); );
} }
// used to emulate sequence numbers returned by save latest API // used to emulate versions returned by save latest API
private static List<Long> listOfNNumbers(int N) { private static List<Long> listOfNNumbers(int N) {
return LongStream.range(0, N).boxed().toList(); return LongStream.range(0, N).boxed().toList();
} }

35
common/util/src/main/java/org/thingsboard/common/util/NoOpFutureCallback.java

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.google.common.util.concurrent.FutureCallback;
public enum NoOpFutureCallback implements FutureCallback<Object> {
INSTANCE;
@Override
public void onSuccess(Object result) {}
@Override
public void onFailure(Throwable t) {}
@SuppressWarnings("unchecked")
public static <T> FutureCallback<T> instance() {
return (FutureCallback<T>) INSTANCE;
}
}

20
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.common.util.NoOpFutureCallback;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@ -32,6 +33,8 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import static java.util.Objects.requireNonNullElse;
@Getter @Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE) @AllArgsConstructor(access = AccessLevel.PRIVATE)
public class TimeseriesSaveRequest { public class TimeseriesSaveRequest {
@ -47,12 +50,12 @@ public class TimeseriesSaveRequest {
private final TbMsgType tbMsgType; private final TbMsgType tbMsgType;
private final FutureCallback<Void> callback; private final FutureCallback<Void> callback;
public record Strategy(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { public record Strategy(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) {
public static final Strategy SAVE_ALL = new Strategy(true, true, true); public static final Strategy PROCESS_ALL = new Strategy(true, true, true, true);
public static final Strategy WS_ONLY = new Strategy(false, false, true); public static final Strategy WS_ONLY = new Strategy(false, false, true, false);
public static final Strategy LATEST_AND_WS = new Strategy(false, true, true); public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false);
public static final Strategy SKIP_ALL = new Strategy(false, false, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false, false);
} }
@ -67,7 +70,7 @@ public class TimeseriesSaveRequest {
private EntityId entityId; private EntityId entityId;
private List<TsKvEntry> entries; private List<TsKvEntry> entries;
private long ttl; private long ttl;
private Strategy strategy = Strategy.SAVE_ALL; private Strategy strategy = Strategy.PROCESS_ALL;
private List<CalculatedFieldId> previousCalculatedFieldIds; private List<CalculatedFieldId> previousCalculatedFieldIds;
private UUID tbMsgId; private UUID tbMsgId;
private TbMsgType tbMsgType; private TbMsgType tbMsgType;
@ -148,7 +151,10 @@ public class TimeseriesSaveRequest {
} }
public TimeseriesSaveRequest build() { public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, strategy, previousCalculatedFieldIds, tbMsgId, tbMsgType, callback); return new TimeseriesSaveRequest(
tenantId, customerId, entityId, entries, ttl, strategy,
previousCalculatedFieldIds, tbMsgId, tbMsgType, requireNonNullElse(callback, NoOpFutureCallback.instance())
);
} }
} }

27
rule-engine/rule-engine-api/src/test/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequestTest.java

@ -16,36 +16,51 @@
package org.thingsboard.rule.engine.api; package org.thingsboard.rule.engine.api;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.thingsboard.common.util.NoOpFutureCallback;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
class TimeseriesSaveRequestTest { class TimeseriesSaveRequestTest {
@Test @Test
void testDefaultSaveStrategyIsSaveAll() { void testDefaultSaveStrategyIsProcessAll() {
var request = TimeseriesSaveRequest.builder().build(); var request = TimeseriesSaveRequest.builder().build();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL);
} }
@Test @Test
void testSaveAllStrategy() { void testSaveAllStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.SAVE_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, true)); assertThat(TimeseriesSaveRequest.Strategy.PROCESS_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, true, true));
} }
@Test @Test
void testWsOnlyStrategy() { void testWsOnlyStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, true)); assertThat(TimeseriesSaveRequest.Strategy.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, true, false));
} }
@Test @Test
void testLatestAndWsStrategy() { void testLatestAndWsStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.Strategy(false, true, true)); assertThat(TimeseriesSaveRequest.Strategy.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.Strategy(false, true, true, false));
} }
@Test @Test
void testSkipAllStrategy() { void testSkipAllStrategy() {
assertThat(TimeseriesSaveRequest.Strategy.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, false)); assertThat(TimeseriesSaveRequest.Strategy.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.Strategy(false, false, false, false));
}
@Test
void testDefaultCallbackIsNoOp() {
var request = TimeseriesSaveRequest.builder().build();
assertThat(request.getCallback()).isEqualTo(NoOpFutureCallback.instance());
}
@Test
void testNullCallbackIsNoOp() {
var request = TimeseriesSaveRequest.builder().callback(null).build();
assertThat(request.getCallback()).isEqualTo(NoOpFutureCallback.instance());
} }
} }

13
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java

@ -66,6 +66,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
<li><strong>Time series:</strong> save time series data to a <code>ts_kv</code> table in a DB.</li> <li><strong>Time series:</strong> save time series data to a <code>ts_kv</code> table in a DB.</li>
<li><strong>Latest values:</strong> save time series data to a <code>ts_kv_latest</code> table in a DB.</li> <li><strong>Latest values:</strong> save time series data to a <code>ts_kv_latest</code> table in a DB.</li>
<li><strong>WebSockets:</strong> notify WebSockets subscriptions about time series data updates.</li> <li><strong>WebSockets:</strong> notify WebSockets subscriptions about time series data updates.</li>
<li><strong>Calculated fields:</strong> notify calculated fields about time series data updates.</li>
</ul> </ul>
For each <em>action</em>, three <strong>processing strategies</strong> are available: For each <em>action</em>, three <strong>processing strategies</strong> are available:
@ -90,7 +91,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
By default, the timestamp is taken from <code>metadata.ts</code>. You can enable By default, the timestamp is taken from <code>metadata.ts</code>. You can enable
<em>Use server timestamp</em> to always use the current server time instead. This is particularly <em>Use server timestamp</em> to always use the current server time instead. This is particularly
useful in sequential processing scenarios where messages may arrive with out-of-order timestamps from useful in sequential processing scenarios where messages may arrive with out-of-order timestamps from
multiple sources. Note that the DB layer may ignore older records for attributes and latest values, multiple sources. Note that the DB layer may ignore "outdated" records for attributes and latest values,
so enabling <em>Use server timestamp</em> can ensure correct ordering. so enabling <em>Use server timestamp</em> can ensure correct ordering.
<br><br> <br><br>
The TTL is taken first from <code>metadata.TTL</code>. If absent, the node configurations default The TTL is taken first from <code>metadata.TTL</code>. If absent, the node configurations default
@ -137,7 +138,7 @@ public class TbMsgTimeseriesNode implements TbNode {
TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId()); TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId());
// short-circuit // short-circuit
if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) { if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate() && !strategy.processCalculatedFields()) {
ctx.tellSuccess(msg); ctx.tellSuccess(msg);
return; return;
} }
@ -179,20 +180,21 @@ public class TbMsgTimeseriesNode implements TbNode {
private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) { private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) {
if (processingSettings instanceof OnEveryMessage) { if (processingSettings instanceof OnEveryMessage) {
return TimeseriesSaveRequest.Strategy.SAVE_ALL; return TimeseriesSaveRequest.Strategy.PROCESS_ALL;
} }
if (processingSettings instanceof WebSocketsOnly) { if (processingSettings instanceof WebSocketsOnly) {
return TimeseriesSaveRequest.Strategy.WS_ONLY; return TimeseriesSaveRequest.Strategy.WS_ONLY;
} }
if (processingSettings instanceof Deduplicate deduplicate) { if (processingSettings instanceof Deduplicate deduplicate) {
boolean isFirstMsgInInterval = deduplicate.getProcessingStrategy().shouldProcess(ts, originatorUuid); boolean isFirstMsgInInterval = deduplicate.getProcessingStrategy().shouldProcess(ts, originatorUuid);
return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.SAVE_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL; return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.PROCESS_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL;
} }
if (processingSettings instanceof Advanced advanced) { if (processingSettings instanceof Advanced advanced) {
return new TimeseriesSaveRequest.Strategy( return new TimeseriesSaveRequest.Strategy(
advanced.timeseries().shouldProcess(ts, originatorUuid), advanced.timeseries().shouldProcess(ts, originatorUuid),
advanced.latest().shouldProcess(ts, originatorUuid), advanced.latest().shouldProcess(ts, originatorUuid),
advanced.webSockets().shouldProcess(ts, originatorUuid) advanced.webSockets().shouldProcess(ts, originatorUuid),
advanced.calculatedFields().shouldProcess(ts, originatorUuid)
); );
} }
// should not happen // should not happen
@ -215,6 +217,7 @@ public class TbMsgTimeseriesNode implements TbNode {
var skipLatestProcessingSettings = new Advanced( var skipLatestProcessingSettings = new Advanced(
ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage(),
ProcessingStrategy.skip(), ProcessingStrategy.skip(),
ProcessingStrategy.onEveryMessage(),
ProcessingStrategy.onEveryMessage() ProcessingStrategy.onEveryMessage()
); );
((ObjectNode) oldConfiguration).set("processingSettings", JacksonUtil.valueToTree(skipLatestProcessingSettings)); ((ObjectNode) oldConfiguration).set("processingSettings", JacksonUtil.valueToTree(skipLatestProcessingSettings));

8
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java

@ -83,12 +83,18 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsg
} }
record Advanced(ProcessingStrategy timeseries, ProcessingStrategy latest, ProcessingStrategy webSockets) implements ProcessingSettings { record Advanced(
ProcessingStrategy timeseries,
ProcessingStrategy latest,
ProcessingStrategy webSockets,
ProcessingStrategy calculatedFields
) implements ProcessingSettings {
public Advanced { public Advanced {
Objects.requireNonNull(timeseries); Objects.requireNonNull(timeseries);
Objects.requireNonNull(latest); Objects.requireNonNull(latest);
Objects.requireNonNull(webSockets); Objects.requireNonNull(webSockets);
Objects.requireNonNull(calculatedFields);
} }
} }

4
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java

@ -533,7 +533,7 @@ public class TbMathNodeTest {
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
assertThat(request.getEntries()).size().isOne(); assertThat(request.getEntries()).size().isOne();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL);
})); }));
TbMsg resultMsg = msgCaptor.getValue(); TbMsg resultMsg = msgCaptor.getValue();
@ -569,7 +569,7 @@ public class TbMathNodeTest {
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
assertThat(request.getEntries()).size().isOne(); assertThat(request.getEntries()).size().isOne();
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL);
})); }));
TbMsg resultMsg = msgCaptor.getValue(); TbMsg resultMsg = msgCaptor.getValue();

64
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java

@ -208,7 +208,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList); assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList);
assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile)); assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile));
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL);
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
})); }));
verify(ctxMock).tellSuccess(msg); verify(ctxMock).tellSuccess(msg);
@ -223,7 +223,8 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
var timeseriesStrategy = ProcessingStrategy.onEveryMessage(); var timeseriesStrategy = ProcessingStrategy.onEveryMessage();
var latestStrategy = ProcessingStrategy.skip(); var latestStrategy = ProcessingStrategy.skip();
var webSockets = ProcessingStrategy.onEveryMessage(); var webSockets = ProcessingStrategy.onEveryMessage();
var processingSettings = new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets); var calculatedFields = ProcessingStrategy.onEveryMessage();
var processingSettings = new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets, calculatedFields);
config.setProcessingSettings(processingSettings); config.setProcessingSettings(processingSettings);
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
@ -265,7 +266,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getEntries()).containsExactlyElementsOf(expectedList); assertThat(request.getEntries()).containsExactlyElementsOf(expectedList);
assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL()); assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL());
assertThat(request.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, true)); assertThat(request.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, true, true));
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
})); }));
verify(ctxMock).tellSuccess(msg); verify(ctxMock).tellSuccess(msg);
@ -304,7 +305,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
assertThat(request.getCustomerId()).isNull(); assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getTtl()).isEqualTo(expectedTtl); assertThat(request.getTtl()).isEqualTo(expectedTtl);
assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL); assertThat(request.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL);
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
})); }));
} }
@ -353,7 +354,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
.entityId(msg.getOriginator()) .entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile)) .ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId()) .tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType()) .tbMsgType(msg.getInternalType())
@ -391,7 +392,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
.entityId(msg.getOriginator()) .entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile)) .ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId()) .tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType()) .tbMsgType(msg.getInternalType())
@ -450,6 +451,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
public void givenAdvancedProcessingSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException { public void givenAdvancedProcessingSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
// GIVEN // GIVEN
config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced( config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(
ProcessingStrategy.onEveryMessage(),
ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage(),
ProcessingStrategy.onEveryMessage(), ProcessingStrategy.onEveryMessage(),
ProcessingStrategy.onEveryMessage() ProcessingStrategy.onEveryMessage()
@ -471,10 +473,11 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
.entityId(msg.getOriginator()) .entityId(msg.getOriginator())
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3))) .entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
.ttl(extractTtlAsSeconds(tenantProfile)) .ttl(extractTtlAsSeconds(tenantProfile))
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
.tbMsgId(msg.getId()) .tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType()) .tbMsgType(msg.getInternalType())
.callback(new TelemetryNodeCallback(ctxMock, msg))
.build(); .build();
node.onMsg(ctxMock, msg); node.onMsg(ctxMock, msg);
@ -494,7 +497,8 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced( config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(
ProcessingStrategy.deduplicate(1), ProcessingStrategy.deduplicate(1),
ProcessingStrategy.deduplicate(2), ProcessingStrategy.deduplicate(2),
ProcessingStrategy.deduplicate(3) ProcessingStrategy.deduplicate(3),
ProcessingStrategy.deduplicate(4)
)); ));
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
@ -502,6 +506,8 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
long ts1 = 500L; long ts1 = 500L;
long ts2 = 1500L; long ts2 = 1500L;
long ts3 = 2500L; long ts3 = 2500L;
long ts4 = 3500L;
long ts5 = 4500L;
// WHEN-THEN // WHEN-THEN
node.onMsg(ctxMock, TbMsg.newMsg() node.onMsg(ctxMock, TbMsg.newMsg()
@ -511,7 +517,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1)))) .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1))))
.build()); .build());
then(telemetryServiceMock).should().saveTimeseries(assertArg( then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.SAVE_ALL) actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
)); ));
clearInvocations(telemetryServiceMock); clearInvocations(telemetryServiceMock);
@ -523,7 +529,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2)))) .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2))))
.build()); .build());
then(telemetryServiceMock).should().saveTimeseries(assertArg( then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, false, false)) actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(
new TimeseriesSaveRequest.Strategy(true, false, false, false)
)
)); ));
clearInvocations(telemetryServiceMock); clearInvocations(telemetryServiceMock);
@ -535,7 +543,37 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3)))) .metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3))))
.build()); .build());
then(telemetryServiceMock).should().saveTimeseries(assertArg( then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(new TimeseriesSaveRequest.Strategy(true, true, false)) actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(
new TimeseriesSaveRequest.Strategy(true, true, false, false)
)
));
clearInvocations(telemetryServiceMock);
node.onMsg(ctxMock, TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts4))))
.build());
then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(
new TimeseriesSaveRequest.Strategy(true, false, true, false)
)
));
clearInvocations(telemetryServiceMock);
node.onMsg(ctxMock, TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(DEVICE_ID)
.data(JacksonUtil.newObjectNode().put("temperature", 22.3).toString())
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts5))))
.build());
then(telemetryServiceMock).should().saveTimeseries(assertArg(
actualSaveRequest -> assertThat(actualSaveRequest.getStrategy()).isEqualTo(
new TimeseriesSaveRequest.Strategy(true, true, false, true)
)
)); ));
} }
@ -543,6 +581,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
public void givenAdvancedProcessingSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException { public void givenAdvancedProcessingSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException {
// GIVEN // GIVEN
config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced( config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(
ProcessingStrategy.skip(),
ProcessingStrategy.skip(), ProcessingStrategy.skip(),
ProcessingStrategy.skip(), ProcessingStrategy.skip(),
ProcessingStrategy.skip() ProcessingStrategy.skip()
@ -643,6 +682,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
}, },
"webSockets": { "webSockets": {
"type": "ON_EVERY_MESSAGE" "type": "ON_EVERY_MESSAGE"
},
"calculatedFields": {
"type": "ON_EVERY_MESSAGE"
} }
} }
}""") }""")

12
ui-ngx/src/app/modules/home/components/rule-node/action/action-rule-node-config.module.ts

@ -43,11 +43,11 @@ import { MathFunctionConfigComponent } from './math-function-config.component';
import { DeviceStateConfigComponent } from './device-state-config.component'; import { DeviceStateConfigComponent } from './device-state-config.component';
import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply-config.component'; import { SendRestApiCallReplyConfigComponent } from './send-rest-api-call-reply-config.component';
import { import {
AdvancedPersistenceSettingComponent AdvancedProcessingSettingComponent
} from '@home/components/rule-node/action/advanced-persistence-setting.component'; } from '@home/components/rule-node/action/advanced-processing-setting.component';
import { import {
AdvancedPersistenceSettingRowComponent AdvancedProcessingSettingRowComponent
} from '@home/components/rule-node/action/advanced-persistence-setting-row.component'; } from '@home/components/rule-node/action/advanced-processing-setting-row.component';
@NgModule({ @NgModule({
declarations: [ declarations: [
@ -74,8 +74,8 @@ import {
PushToCloudConfigComponent, PushToCloudConfigComponent,
MathFunctionConfigComponent, MathFunctionConfigComponent,
DeviceStateConfigComponent, DeviceStateConfigComponent,
AdvancedPersistenceSettingComponent, AdvancedProcessingSettingComponent,
AdvancedPersistenceSettingRowComponent, AdvancedProcessingSettingRowComponent,
], ],
imports: [ imports: [
CommonModule, CommonModule,

8
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.html → ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.html

@ -15,17 +15,17 @@
limitations under the License. limitations under the License.
--> -->
<section [formGroup]="persistenceSettingRowForm" class="tb-form-panel stroked no-gap no-padding-bottom"> <section [formGroup]="processingSettingRowForm" class="tb-form-panel stroked no-gap no-padding-bottom">
<div class="tb-form-panel-title mb-4">{{ title }}</div> <div class="tb-form-panel-title mb-4">{{ title }}</div>
<mat-form-field> <mat-form-field>
<mat-label translate>rule-node-config.save-time-series.strategy</mat-label> <mat-label translate>rule-node-config.save-time-series.strategy</mat-label>
<mat-select formControlName="type"> <mat-select formControlName="type">
@for (strategy of persistenceStrategies; track strategy) { @for (strategy of processingStrategies; track strategy) {
<mat-option [value]="strategy">{{ PersistenceTypeTranslationMap.get(strategy) | translate }}</mat-option> <mat-option [value]="strategy">{{ ProcessingTypeTranslationMap.get(strategy) | translate }}</mat-option>
} }
</mat-select> </mat-select>
</mat-form-field> </mat-form-field>
@if(persistenceSettingRowForm.get('type').value === PersistenceType.DEDUPLICATE) { @if (processingSettingRowForm.get('type').value === ProcessingType.DEDUPLICATE) {
<tb-time-unit-input <tb-time-unit-input
required required
labelText="{{ 'rule-node-config.save-time-series.deduplication-interval' | translate }}" labelText="{{ 'rule-node-config.save-time-series.deduplication-interval' | translate }}"

40
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting-row.component.ts → ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting-row.component.ts

@ -34,42 +34,42 @@ import { isDefinedAndNotNull } from '@core/utils';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({ @Component({
selector: 'tb-advanced-persistence-setting-row', selector: 'tb-advanced-processing-setting-row',
templateUrl: './advanced-persistence-setting-row.component.html', templateUrl: './advanced-processing-setting-row.component.html',
providers: [{ providers: [{
provide: NG_VALUE_ACCESSOR, provide: NG_VALUE_ACCESSOR,
useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent), useExisting: forwardRef(() => AdvancedProcessingSettingRowComponent),
multi: true multi: true
},{ },{
provide: NG_VALIDATORS, provide: NG_VALIDATORS,
useExisting: forwardRef(() => AdvancedPersistenceSettingRowComponent), useExisting: forwardRef(() => AdvancedProcessingSettingRowComponent),
multi: true multi: true
}] }]
}) })
export class AdvancedPersistenceSettingRowComponent implements ControlValueAccessor, Validator { export class AdvancedProcessingSettingRowComponent implements ControlValueAccessor, Validator {
@Input() @Input()
title: string; title: string;
persistenceSettingRowForm = this.fb.group({ processingSettingRowForm = this.fb.group({
type: [defaultAdvancedProcessingConfig.type], type: [defaultAdvancedProcessingConfig.type],
deduplicationIntervalSecs: [{value: 60, disabled: true}] deduplicationIntervalSecs: [{value: 60, disabled: true}]
}); });
PersistenceType = ProcessingType; ProcessingType = ProcessingType;
persistenceStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.SKIP]; processingStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.SKIP];
PersistenceTypeTranslationMap = ProcessingTypeTranslationMap; ProcessingTypeTranslationMap = ProcessingTypeTranslationMap;
maxDeduplicateTime = maxDeduplicateTimeSecs; maxDeduplicateTime = maxDeduplicateTimeSecs;
private propagateChange: (value: any) => void = () => {}; private propagateChange: (value: any) => void = () => {};
constructor(private fb: FormBuilder) { constructor(private fb: FormBuilder) {
this.persistenceSettingRowForm.get('type').valueChanges.pipe( this.processingSettingRowForm.get('type').valueChanges.pipe(
takeUntilDestroyed() takeUntilDestroyed()
).subscribe(() => this.updatedValidation()); ).subscribe(() => this.updatedValidation());
this.persistenceSettingRowForm.valueChanges.pipe( this.processingSettingRowForm.valueChanges.pipe(
takeUntilDestroyed() takeUntilDestroyed()
).subscribe((value) => this.propagateChange(value)); ).subscribe((value) => this.propagateChange(value));
} }
@ -83,32 +83,32 @@ export class AdvancedPersistenceSettingRowComponent implements ControlValueAcces
setDisabledState(isDisabled: boolean) { setDisabledState(isDisabled: boolean) {
if (isDisabled) { if (isDisabled) {
this.persistenceSettingRowForm.disable({emitEvent: false}); this.processingSettingRowForm.disable({emitEvent: false});
} else { } else {
this.persistenceSettingRowForm.enable({emitEvent: false}); this.processingSettingRowForm.enable({emitEvent: false});
this.updatedValidation(); this.updatedValidation();
} }
} }
validate(): ValidationErrors | null { validate(): ValidationErrors | null {
return this.persistenceSettingRowForm.valid ? null : { return this.processingSettingRowForm.valid ? null : {
persistenceSettingRow: false processingSettingRow: false
}; };
} }
writeValue(value: AdvancedProcessingConfig) { writeValue(value: AdvancedProcessingConfig) {
if (isDefinedAndNotNull(value)) { if (isDefinedAndNotNull(value)) {
this.persistenceSettingRowForm.patchValue(value, {emitEvent: false}); this.processingSettingRowForm.patchValue(value, {emitEvent: false});
} else { } else {
this.persistenceSettingRowForm.patchValue(defaultAdvancedProcessingConfig); this.processingSettingRowForm.patchValue(defaultAdvancedProcessingConfig);
} }
} }
private updatedValidation() { private updatedValidation() {
if (this.persistenceSettingRowForm.get('type').value === ProcessingType.DEDUPLICATE) { if (this.processingSettingRowForm.get('type').value === ProcessingType.DEDUPLICATE) {
this.persistenceSettingRowForm.get('deduplicationIntervalSecs').enable({emitEvent: false}); this.processingSettingRowForm.get('deduplicationIntervalSecs').enable({emitEvent: false});
} else { } else {
this.persistenceSettingRowForm.get('deduplicationIntervalSecs').disable({emitEvent: false}) this.processingSettingRowForm.get('deduplicationIntervalSecs').disable({emitEvent: false})
} }
} }
} }

18
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.html → ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.html

@ -15,22 +15,26 @@
limitations under the License. limitations under the License.
--> -->
<section [formGroup]="persistenceForm" class="tb-form-panel no-border no-padding"> <section [formGroup]="processingForm" class="tb-form-panel no-border no-padding">
<tb-example-hint <tb-example-hint
[hintText]="'rule-node-config.save-time-series.advanced-settings-hint'" [hintText]="'rule-node-config.save-time-series.advanced-settings-hint'"
[popupHelpLink]="'rulenode/save_timeseries_node_advanced'" [popupHelpLink]="'rulenode/save_timeseries_node_advanced'"
> >
</tb-example-hint> </tb-example-hint>
<tb-advanced-persistence-setting-row <tb-advanced-processing-setting-row
formControlName="timeseries" formControlName="timeseries"
title="{{ 'rule-node-config.save-time-series.time-series' | translate }}" title="{{ 'rule-node-config.save-time-series.time-series' | translate }}"
></tb-advanced-persistence-setting-row> ></tb-advanced-processing-setting-row>
<tb-advanced-persistence-setting-row <tb-advanced-processing-setting-row
formControlName="latest" formControlName="latest"
title="{{ 'rule-node-config.save-time-series.latest' | translate }}" title="{{ 'rule-node-config.save-time-series.latest' | translate }}"
></tb-advanced-persistence-setting-row> ></tb-advanced-processing-setting-row>
<tb-advanced-persistence-setting-row <tb-advanced-processing-setting-row
formControlName="webSockets" formControlName="webSockets"
title="{{ 'rule-node-config.save-time-series.web-sockets' | translate }}" title="{{ 'rule-node-config.save-time-series.web-sockets' | translate }}"
></tb-advanced-persistence-setting-row> ></tb-advanced-processing-setting-row>
<tb-advanced-processing-setting-row
formControlName="calculatedFields"
title="{{ 'rule-node-config.save-time-series.calculated-fields' | translate }}"
></tb-advanced-processing-setting-row>
</section> </section>

27
ui-ngx/src/app/modules/home/components/rule-node/action/advanced-persistence-setting.component.ts → ui-ngx/src/app/modules/home/components/rule-node/action/advanced-processing-setting.component.ts

@ -27,30 +27,31 @@ import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { AdvancedProcessingStrategy } from '@home/components/rule-node/action/timeseries-config.models'; import { AdvancedProcessingStrategy } from '@home/components/rule-node/action/timeseries-config.models';
@Component({ @Component({
selector: 'tb-advanced-persistence-settings', selector: 'tb-advanced-processing-settings',
templateUrl: './advanced-persistence-setting.component.html', templateUrl: './advanced-processing-setting.component.html',
providers: [{ providers: [{
provide: NG_VALUE_ACCESSOR, provide: NG_VALUE_ACCESSOR,
useExisting: forwardRef(() => AdvancedPersistenceSettingComponent), useExisting: forwardRef(() => AdvancedProcessingSettingComponent),
multi: true multi: true
},{ },{
provide: NG_VALIDATORS, provide: NG_VALIDATORS,
useExisting: forwardRef(() => AdvancedPersistenceSettingComponent), useExisting: forwardRef(() => AdvancedProcessingSettingComponent),
multi: true multi: true
}] }]
}) })
export class AdvancedPersistenceSettingComponent implements ControlValueAccessor, Validator { export class AdvancedProcessingSettingComponent implements ControlValueAccessor, Validator {
persistenceForm = this.fb.group({ processingForm = this.fb.group({
timeseries: [null], timeseries: [null],
latest: [null], latest: [null],
webSockets: [null] webSockets: [null],
calculatedFields: [null]
}); });
private propagateChange: (value: any) => void = () => {}; private propagateChange: (value: any) => void = () => {};
constructor(private fb: FormBuilder) { constructor(private fb: FormBuilder) {
this.persistenceForm.valueChanges.pipe( this.processingForm.valueChanges.pipe(
takeUntilDestroyed() takeUntilDestroyed()
).subscribe(value => this.propagateChange(value)); ).subscribe(value => this.propagateChange(value));
} }
@ -64,19 +65,19 @@ export class AdvancedPersistenceSettingComponent implements ControlValueAccessor
setDisabledState(isDisabled: boolean) { setDisabledState(isDisabled: boolean) {
if (isDisabled) { if (isDisabled) {
this.persistenceForm.disable({emitEvent: false}); this.processingForm.disable({emitEvent: false});
} else { } else {
this.persistenceForm.enable({emitEvent: false}); this.processingForm.enable({emitEvent: false});
} }
} }
validate(): ValidationErrors | null { validate(): ValidationErrors | null {
return this.persistenceForm.valid ? null : { return this.processingForm.valid ? null : {
persistenceForm: false processingForm: false
}; };
} }
writeValue(value: AdvancedProcessingStrategy) { writeValue(value: AdvancedProcessingStrategy) {
this.persistenceForm.patchValue(value, {emitEvent: false}); this.processingForm.patchValue(value, {emitEvent: false});
} }
} }

10
ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.html

@ -31,13 +31,13 @@
<mat-form-field> <mat-form-field>
<mat-label translate>rule-node-config.save-time-series.strategy</mat-label> <mat-label translate>rule-node-config.save-time-series.strategy</mat-label>
<mat-select formControlName="type"> <mat-select formControlName="type">
@for (strategy of persistenceStrategies; track strategy) { @for (strategy of processingStrategies; track strategy) {
<mat-option [value]="strategy">{{ PersistenceTypeTranslationMap.get(strategy) | translate }}</mat-option> <mat-option [value]="strategy">{{ ProcessingTypeTranslationMap.get(strategy) | translate }}</mat-option>
} }
</mat-select> </mat-select>
</mat-form-field> </mat-form-field>
@if(timeseriesConfigForm.get('processingSettings.type').value === PersistenceType.DEDUPLICATE) { @if(timeseriesConfigForm.get('processingSettings.type').value === ProcessingType.DEDUPLICATE) {
<tb-time-unit-input <tb-time-unit-input
required required
labelText="{{ 'rule-node-config.save-time-series.deduplication-interval' | translate }}" labelText="{{ 'rule-node-config.save-time-series.deduplication-interval' | translate }}"
@ -50,10 +50,10 @@
</tb-time-unit-input> </tb-time-unit-input>
} }
} @else { } @else {
<tb-advanced-persistence-settings <tb-advanced-processing-settings
class="mb-4" class="mb-4"
formControlName="advanced" formControlName="advanced"
></tb-advanced-persistence-settings> ></tb-advanced-processing-settings>
} }
</div> </div>
<section class="tb-form-panel stroked"> <section class="tb-form-panel stroked">

12
ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.component.ts

@ -18,7 +18,7 @@ import { Component } from '@angular/core';
import { FormBuilder, FormGroup, Validators } from '@angular/forms'; import { FormBuilder, FormGroup, Validators } from '@angular/forms';
import { RuleNodeConfigurationComponent } from '@shared/models/rule-node.models'; import { RuleNodeConfigurationComponent } from '@shared/models/rule-node.models';
import { import {
defaultAdvancedPersistenceStrategy, defaultAdvancedProcessingStrategy,
maxDeduplicateTimeSecs, maxDeduplicateTimeSecs,
ProcessingSettings, ProcessingSettings,
ProcessingSettingsForm, ProcessingSettingsForm,
@ -37,9 +37,9 @@ export class TimeseriesConfigComponent extends RuleNodeConfigurationComponent {
timeseriesConfigForm: FormGroup; timeseriesConfigForm: FormGroup;
PersistenceType = ProcessingType; ProcessingType = ProcessingType;
persistenceStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.WEBSOCKETS_ONLY]; processingStrategies = [ProcessingType.ON_EVERY_MESSAGE, ProcessingType.DEDUPLICATE, ProcessingType.WEBSOCKETS_ONLY];
PersistenceTypeTranslationMap = ProcessingTypeTranslationMap; ProcessingTypeTranslationMap = ProcessingTypeTranslationMap;
maxDeduplicateTime = maxDeduplicateTimeSecs maxDeduplicateTime = maxDeduplicateTimeSecs
@ -63,14 +63,14 @@ export class TimeseriesConfigComponent extends RuleNodeConfigurationComponent {
type: isAdvanced ? ProcessingType.ON_EVERY_MESSAGE : config.processingSettings.type, type: isAdvanced ? ProcessingType.ON_EVERY_MESSAGE : config.processingSettings.type,
isAdvanced: isAdvanced, isAdvanced: isAdvanced,
deduplicationIntervalSecs: config.processingSettings?.deduplicationIntervalSecs ?? 60, deduplicationIntervalSecs: config.processingSettings?.deduplicationIntervalSecs ?? 60,
advanced: isAdvanced ? config.processingSettings : defaultAdvancedPersistenceStrategy advanced: isAdvanced ? config.processingSettings : defaultAdvancedProcessingStrategy
} }
} else { } else {
processingSettings = { processingSettings = {
type: ProcessingType.ON_EVERY_MESSAGE, type: ProcessingType.ON_EVERY_MESSAGE,
isAdvanced: false, isAdvanced: false,
deduplicationIntervalSecs: 60, deduplicationIntervalSecs: 60,
advanced: defaultAdvancedPersistenceStrategy advanced: defaultAdvancedProcessingStrategy
}; };
} }
return { return {

8
ui-ngx/src/app/modules/home/components/rule-node/action/timeseries-config.models.ts

@ -55,14 +55,15 @@ export interface BasicProcessingSettings {
type: ProcessingType; type: ProcessingType;
} }
export interface DeduplicateProcessingStrategy extends BasicProcessingSettings{ export interface DeduplicateProcessingStrategy extends BasicProcessingSettings {
deduplicationIntervalSecs: number; deduplicationIntervalSecs: number;
} }
export interface AdvancedProcessingStrategy extends BasicProcessingSettings{ export interface AdvancedProcessingStrategy extends BasicProcessingSettings {
timeseries: AdvancedProcessingConfig; timeseries: AdvancedProcessingConfig;
latest: AdvancedProcessingConfig; latest: AdvancedProcessingConfig;
webSockets: AdvancedProcessingConfig; webSockets: AdvancedProcessingConfig;
calculatedFields: AdvancedProcessingConfig;
} }
export type AdvancedProcessingConfig = WithOptional<DeduplicateProcessingStrategy, 'deduplicationIntervalSecs'>; export type AdvancedProcessingConfig = WithOptional<DeduplicateProcessingStrategy, 'deduplicationIntervalSecs'>;
@ -71,8 +72,9 @@ export const defaultAdvancedProcessingConfig: AdvancedProcessingConfig = {
type: ProcessingType.ON_EVERY_MESSAGE type: ProcessingType.ON_EVERY_MESSAGE
} }
export const defaultAdvancedPersistenceStrategy: Omit<AdvancedProcessingStrategy, 'type'> = { export const defaultAdvancedProcessingStrategy: Omit<AdvancedProcessingStrategy, 'type'> = {
timeseries: defaultAdvancedProcessingConfig, timeseries: defaultAdvancedProcessingConfig,
latest: defaultAdvancedProcessingConfig, latest: defaultAdvancedProcessingConfig,
webSockets: defaultAdvancedProcessingConfig, webSockets: defaultAdvancedProcessingConfig,
calculatedFields: defaultAdvancedProcessingConfig,
} }

17
ui-ngx/src/assets/help/en_US/rulenode/save_timeseries_node_advanced.md

@ -2,23 +2,28 @@
When configuring the processing strategies, certain combinations can lead to unexpected behavior. Consider the following scenarios: When configuring the processing strategies, certain combinations can lead to unexpected behavior. Consider the following scenarios:
- **Skipping database storage**
Choosing to disable one or more persistence actions (for instance, skipping database storage for Time series or Latest values while keeping WS updates enabled) introduces the risk of having only partial data available:
- If a message is processed only for real-time notifications (WebSockets) and not stored in the database, historical queries may not match data on the dashboard.
- When processing strategies for Time series and Latest values are out-of-sync, telemetry data may be stored in one table (e.g., Time series) while the same data is absent in the other (e.g., Latest values).
- **Disabling WebSocket (WS) updates** - **Disabling WebSocket (WS) updates**
If WS updates are disabled, any changes to the time series data won’t be pushed to dashboards (or other WS subscriptions). If WS updates are disabled, any changes to the time series data won’t be pushed to dashboards (or other WS subscriptions).
This means that even if a database is updated, dashboards may not display the updated data until browser page is reloaded. This means that even if a database is updated, dashboards may not display the updated data until browser page is reloaded.
- **Skipping calculated field recalculation**
If telemetry data is saved to the database while bypassing calculated field recalculation, the aggregated value may not update to reflect the latest data.
Conversely, if the calculated field is recalculated with new data but the corresponding telemetry value is not persisted in the database, the calculated field's value might include data that isn’t stored.
- **Different deduplication intervals across actions** - **Different deduplication intervals across actions**
When you configure different deduplication intervals for actions, the same incoming message might be processed differently for each action. When you configure different deduplication intervals for actions, the same incoming message might be processed differently for each action.
For example, a message might be stored immediately in the Time series table (if set to *On every message*) while not being stored in the Latest values table because its deduplication interval hasn’t elapsed. For example, a message might be stored immediately in the Time series table (if set to *On every message*) while not being stored in the Latest values table because its deduplication interval hasn’t elapsed.
Also, if the WebSocket updates are configured with a different interval, dashboards might show updates that do not match what is stored in the database. Also, if the WebSocket updates are configured with a different interval, dashboards might show updates that do not match what is stored in the database.
- **Skipping database storage**
Choosing to disable one or more persistence actions (for instance, skipping database storage for Time series or Latest values while keeping WS updates enabled) introduces the risk of having only partial data available:
- If a message is processed only for real-time notifications (WebSockets) and not stored in the database, historical queries may not match data on the dashboard.
- When processing strategies for Time series and Latest values are out-of-sync, telemetry data may be stored in one table (e.g., Time series) while the same data is absent in the other (e.g., Latest values).
- **Deduplication cache clearing** - **Deduplication cache clearing**
The deduplication mechanism uses a cache to track processed messages within each interval. The deduplication mechanism uses a cache to track processed messages within each interval.

3
ui-ngx/src/assets/locale/locale.constant-en_US.json

@ -5250,7 +5250,8 @@
}, },
"time-series": "Time series", "time-series": "Time series",
"latest": "Latest values", "latest": "Latest values",
"web-sockets": "WebSockets" "web-sockets": "WebSockets",
"calculated-fields": "Calculated fields"
}, },
"key-val": { "key-val": {
"key": "Key", "key": "Key",

Loading…
Cancel
Save