diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 53261bf7dd..232b8a3d36 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -104,7 +104,7 @@ public class TbMsgTimeseriesNode implements TbNode { } long ts = computeTs(msg, config.isUseServerTs()); - TimeseriesSaveRequest.Strategy strategy = determineSaveActions(ts, msg.getOriginator().getId()); + TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId()); // short-circuit if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) { @@ -144,7 +144,7 @@ public class TbMsgTimeseriesNode implements TbNode { return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs(); } - private TimeseriesSaveRequest.Strategy determineSaveActions(long ts, UUID originatorUuid) { + private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) { if (persistenceSettings instanceof OnEveryMessage) { return TimeseriesSaveRequest.Strategy.SAVE_ALL; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java index 513523f8f3..601328c304 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java @@ -30,6 +30,9 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy { private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1; private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds(); + private static final int MAX_TOTAL_INTERVALS_DURATION_SECS = (int) Duration.ofDays(2L).toSeconds(); + private static final int MAX_NUMBER_OF_INTERVALS = 100; + private final long deduplicationIntervalMillis; private final LoadingCache> deduplicationCache; @@ -43,10 +46,19 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy { deduplicationCache = Caffeine.newBuilder() .softValues() .expireAfterAccess(Duration.ofSeconds(deduplicationIntervalSecs * 10L)) - .maximumSize(20L) + .maximumSize(calculateMaxNumberOfDeduplicationIntervals(deduplicationIntervalSecs)) .build(__ -> Sets.newConcurrentHashSet()); } + /** + * Calculates the maximum number of deduplication intervals we will store in the cache. + * We limit retention to two days to avoid stale data and cap it at 100 intervals to manage memory usage. + */ + private static long calculateMaxNumberOfDeduplicationIntervals(int deduplicationIntervalSecs) { + int numberOfDeduplicationIntervals = MAX_TOTAL_INTERVALS_DURATION_SECS / deduplicationIntervalSecs; + return Math.min(numberOfDeduplicationIntervals, MAX_NUMBER_OF_INTERVALS); + } + @JsonProperty("deduplicationIntervalSecs") public long getDeduplicationIntervalSecs() { return Duration.ofMillis(deduplicationIntervalMillis).toSeconds(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java index 8aeda400b4..1a6bdc831d 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategyTest.java @@ -15,10 +15,14 @@ */ package org.thingsboard.rule.engine.telemetry.strategy; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Policy; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; import java.time.Duration; +import java.util.Set; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -49,6 +53,57 @@ class DeduplicatePersistenceStrategyTest { .hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)"); } + @Test + void shouldNotAllowMoreThan100DeduplicationIntervals() { + // GIVEN + int deduplicationIntervalSecs = 1; // min deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().eviction()) + .isPresent() + .map(Policy.Eviction::getMaximum) + .hasValue(100L); + } + + @Test + void shouldCalculateMaxIntervalsAsTwoDaysDividedByIntervalDuration() { + // GIVEN + int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds(); + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().eviction()) + .isPresent() + .map(Policy.Eviction::getMaximum) + .hasValue(48L); + } + + @Test + void shouldKeepAtLeastTwoDeduplicationIntervals() { + // GIVEN + int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration + + // WHEN + strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + + // THEN + var deduplicationCache = (LoadingCache>) ReflectionTestUtils.getField(strategy, "deduplicationCache"); + + assertThat(deduplicationCache.policy().eviction()) + .isPresent() + .map(Policy.Eviction::getMaximum) + .hasValue(2L); + } + @Test void shouldReturnTrueForFirstCallInInterval() { long ts = 1_000_000L;