diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4e5458bd18..7cc1fb9761 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -491,6 +491,10 @@ cache: attributes: # make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random' enabled: "${CACHE_ATTRIBUTES_ENABLED:true}" + ts_latest: + # Will enable cache-aside strategy for SQL timeseries latest DAO. + # make sure that if cache.type is 'redis' and cache.ts_latest.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random' + enabled: "${CACHE_TS_LATEST_ENABLED:true}" specs: relations: timeToLiveInMinutes: "${CACHE_SPECS_RELATIONS_TTL:1440}" # Relations cache TTL @@ -547,6 +551,9 @@ cache: attributes: timeToLiveInMinutes: "${CACHE_SPECS_ATTRIBUTES_TTL:1440}" # Attributes cache TTL maxSize: "${CACHE_SPECS_ATTRIBUTES_MAX_SIZE:100000}" # 0 means the cache is disabled + tsLatest: + timeToLiveInMinutes: "${CACHE_SPECS_TS_LATEST_TTL:1440}" # Timeseries latest cache TTL + maxSize: "${CACHE_SPECS_TS_LATEST_MAX_SIZE:100000}" # 0 means the cache is disabled userSessionsInvalidation: # The value of this TTL is ignored and replaced by the JWT refresh token expiration time timeToLiveInMinutes: "0" diff --git a/build.sh b/build.sh new file mode 100755 index 0000000000..2c6a7d23fa --- /dev/null +++ b/build.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# +# Copyright © 2016-2024 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e # exit on any error + +#PROJECTS="msa/tb-node,msa/web-ui,rule-engine-pe/rule-node-twilio-sms" +PROJECTS="" + +if [ "$1" ]; then + PROJECTS="--projects $1" +fi + +echo "Building and pushing [amd64,arm64] projects '$PROJECTS' ..." +echo "HELP: usage ./build.sh [projects]" +echo "HELP: example ./build.sh msa/web-ui,msa/web-report" +java -version +#echo "Cleaning ui-ngx/node_modules" && rm -rf ui-ngx/node_modules + +MAVEN_OPTS="-Xmx1024m" NODE_OPTIONS="--max_old_space_size=4096" DOCKER_CLI_EXPERIMENTAL=enabled DOCKER_BUILDKIT=0 \ +mvn -T2 license:format clean install -DskipTests \ + $PROJECTS --also-make +# \ +# -Dpush-docker-amd-arm-images +# -Ddockerfile.skip=false -Dpush-docker-image=true +# --offline +# --projects '!msa/web-report' --also-make + +# push all +# mvn -T 1C license:format clean install -DskipTests -Ddockerfile.skip=false -Dpush-docker-image=true + + +## Build and push AMD and ARM docker images using docker buildx +## Reference to article how to setup docker miltiplatform build environment: https://medium.com/@artur.klauser/building-multi-architecture-docker-images-with-buildx-27d80f7e2408 +## install docker-ce from docker repo https://docs.docker.com/engine/install/ubuntu/ +# sudo apt install -y qemu-user-static binfmt-support +# export DOCKER_CLI_EXPERIMENTAL=enabled +# docker version +# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes +# docker buildx create --name mybuilder +# docker buildx use mybuilder +# docker buildx inspect --bootstrap +# docker buildx ls +# mvn clean install -P push-docker-amd-arm-images \ No newline at end of file diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index e33e13caa1..dd371d96b6 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -51,6 +51,7 @@ public abstract class RedisTbTransactionalCache keySerializer = StringRedisSerializer.UTF_8; private final TbRedisSerializer valueSerializer; @@ -116,7 +117,7 @@ public abstract class RedisTbTransactionalCache implements TbRedisSerializer { + + final RedisSerializer serializer = RedisSerializer.java(); + + @Override + public byte[] serialize(V value) throws SerializationException { + return serializer.serialize(value); + } + + @Override + public V deserialize(K key, byte[] bytes) throws SerializationException { + return (V) serializer.deserialize(bytes); + } + +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java new file mode 100644 index 0000000000..634302a1f3 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +@ConditionalOnExpression("('${database.ts_latest.type}'=='sql' || '${database.ts_latest.type}'=='timescale') && '${cache.ts_latest.enabled:false}'=='true' && '${cache.type:caffeine}'=='redis' ") +public @interface SqlTsLatestAnyDaoCachedRedis { +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index c95259aec5..2de57f36ac 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -36,6 +36,7 @@ public class CacheConstants { public static final String ASSET_PROFILE_CACHE = "assetProfiles"; public static final String ATTRIBUTES_CACHE = "attributes"; + public static final String TS_LATEST_CACHE = "tsLatest"; public static final String USERS_SESSION_INVALIDATION_CACHE = "userSessionsInvalidation"; public static final String OTA_PACKAGE_CACHE = "otaPackages"; public static final String OTA_PACKAGE_DATA_CACHE = "otaPackagesData"; diff --git a/dao/pom.xml b/dao/pom.xml index 061c130a69..28313c64ce 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -216,6 +216,11 @@ jdbc test + + org.testcontainers + junit-jupiter + test + org.springframework spring-context-support diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java new file mode 100644 index 0000000000..32d0ee91c9 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -0,0 +1,181 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; +import org.thingsboard.server.cache.TbCacheValueWrapper; +import org.thingsboard.server.cache.TbTransactionalCache; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; +import org.thingsboard.server.common.stats.DefaultCounter; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.dao.cache.CacheExecutorService; +import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; +import org.thingsboard.server.dao.timeseries.TsLatestCacheKey; +import org.thingsboard.server.dao.util.SqlTsLatestAnyDaoCachedRedis; + +import java.util.List; +import java.util.Optional; + +@Slf4j +@Component +@SqlTsLatestAnyDaoCachedRedis +@RequiredArgsConstructor +@Primary +public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { + public static final String STATS_NAME = "ts_latest.cache"; + final CacheExecutorService cacheExecutorService; + final SqlTimeseriesLatestDao sqlDao; + final StatsFactory statsFactory; + final TbTransactionalCache cache; + DefaultCounter hitCounter; + DefaultCounter missCounter; + + @PostConstruct + public void init() { + log.info("Init Redis cache-aside SQL Timeseries Latest DAO"); + this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit"); + this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); + } + + @Override + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + ListenableFuture future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); + future = Futures.transform(future, x -> { + cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry); + return x; + }, + cacheExecutorService); + if (log.isTraceEnabled()) { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); + } + + @Override + public void onFailure(Throwable t) { + log.info("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); + } + }, MoreExecutors.directExecutor()); + } + return future; + } + + @Override + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + ListenableFuture future = sqlDao.removeLatest(tenantId, entityId, query); + future = Futures.transform(future, x -> { + cache.evict(new TsLatestCacheKey(entityId, query.getKey())); + return x; + }, + cacheExecutorService); + if (log.isTraceEnabled()) { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(TsKvLatestRemovingResult result) { + log.trace("removeLatest onSuccess [{}][{}][{}]", entityId, query.getKey(), query); + } + + @Override + public void onFailure(Throwable t) { + log.info("removeLatest onFailure [{}][{}][{}]", entityId, query.getKey(), query, t); + } + }, MoreExecutors.directExecutor()); + } + return future; + } + + @Override + public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { + log.trace("findLatestOpt"); + return doFindLatest(tenantId, entityId, key); + } + + @Override + public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { + return Futures.transform(doFindLatest(tenantId, entityId, key), x -> sqlDao.wrapNullTsKvEntry(key, x.orElse(null)), MoreExecutors.directExecutor()); + } + + public ListenableFuture> doFindLatest(TenantId tenantId, EntityId entityId, String key) { + final TsLatestCacheKey cacheKey = new TsLatestCacheKey(entityId, key); + ListenableFuture> cacheFuture = cacheExecutorService.submit(() -> cache.get(cacheKey)); + + return Futures.transformAsync(cacheFuture, (cacheValueWrap) -> { + if (cacheValueWrap != null) { + final TsKvEntry tsKvEntry = cacheValueWrap.get(); + log.debug("findLatest cache hit [{}][{}][{}]", entityId, key, tsKvEntry); + return Futures.immediateFuture(Optional.ofNullable(tsKvEntry)); + } + log.debug("findLatest cache miss [{}][{}]", entityId, key); + ListenableFuture> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key); + + return Futures.transformAsync(daoFuture, (daoValue) -> { + + if (daoValue.isEmpty()) { + //TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side + return Futures.immediateFuture(daoValue); + } + ListenableFuture> cachePutFuture = cacheExecutorService.submit(() -> { + cache.put(new TsLatestCacheKey(entityId, key), daoValue.get()); + return daoValue; + }); + + Futures.addCallback(cachePutFuture, new FutureCallback<>() { + @Override + public void onSuccess(Optional result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result); + } + + @Override + public void onFailure(Throwable t) { + log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t); + } + + }, MoreExecutors.directExecutor()); + return cachePutFuture; + }, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); + } + + @Override + public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { + return sqlDao.findAllLatest(tenantId, entityId); + } + + @Override + public List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { + return sqlDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId); + } + + @Override + public List findAllKeysByEntityIds(TenantId tenantId, List entityIds) { + return sqlDao.findAllKeysByEntityIds(tenantId, entityIds); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 513414b8ac..edb0e535d1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -157,12 +157,13 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key))); + return service.submit(() -> Optional.ofNullable(doFindLatestSync(entityId, key))); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return service.submit(() -> getLatestTsKvEntry(entityId, key)); + log.trace("findLatest [{}][{}][{}]", tenantId, entityId, key); + return service.submit(() -> wrapNullTsKvEntry(key, doFindLatestSync(entityId, key))); } @Override @@ -206,7 +207,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme ReadTsKvQueryResult::getData, MoreExecutors.directExecutor()); } - protected TsKvEntry doFindLatest(EntityId entityId, String key) { + protected TsKvEntry doFindLatestSync(EntityId entityId, String key) { TsKvLatestCompositeKey compositeKey = new TsKvLatestCompositeKey( entityId.getId(), @@ -222,7 +223,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - ListenableFuture latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey())); + ListenableFuture latestFuture = service.submit(() -> doFindLatestSync(entityId, query.getKey())); return Futures.transformAsync(latestFuture, latest -> { if (latest == null) { return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); @@ -263,10 +264,9 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return tsLatestQueue.add(latestEntity); } - private TsKvEntry getLatestTsKvEntry(EntityId entityId, String key) { - TsKvEntry latest = doFindLatest(entityId, key); + protected TsKvEntry wrapNullTsKvEntry(final String key, final TsKvEntry latest) { if (latest == null) { - latest = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null)); + return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null)); } return latest; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java new file mode 100644 index 0000000000..adb572922a --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.EntityId; + +import java.io.Serial; +import java.io.Serializable; + +@EqualsAndHashCode +@Getter +@AllArgsConstructor +public class TsLatestCacheKey implements Serializable { + private static final long serialVersionUID = 2024369077925351881L; + + private final EntityId entityId; + private final String key; + + @Override + public String toString() { + return "{" + entityId + "}" + key; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java new file mode 100644 index 0000000000..e9f9ab522c --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.CacheSpecsMap; +import org.thingsboard.server.cache.RedisTbTransactionalCache; +import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.cache.TbCacheTransaction; +import org.thingsboard.server.cache.TbCacheValueWrapper; +import org.thingsboard.server.cache.TbJavaRedisSerializer; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") +@Service("TsLatestCache") +@Slf4j +public class TsLatestRedisCache extends RedisTbTransactionalCache { + + static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" + + "redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " + + "redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " + + "local current_size = redis.call('ZCARD', KEYS[1]); " + + "if current_size > 1 then" + + " redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " + + "end;"); + static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988"); + + public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { + super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); + } + + @PostConstruct + public void init() { + try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) { + log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); + if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection()); + } + } catch (Throwable t) { + log.error("Error on Redis TS Latest cache init", t); + } + } + + @Override + public TbCacheValueWrapper get(TsLatestCacheKey key) { + log.debug("get [{}]", key); + return super.get(key); + } + + @Override + protected byte[] doGet(RedisConnection connection, byte[] rawKey) { + log.trace("doGet [{}][{}]", connection, rawKey); + Set values = connection.commands().zRange(rawKey, -1, -1); + return values == null ? null : values.stream().findFirst().orElse(null); + } + + @Override + public void put(TsLatestCacheKey key, TsKvEntry value) { + log.trace("put [{}][{}]", key, value); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + byte[] rawValue = getRawValue(value); + byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs())); + try { + connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } catch (InvalidDataAccessApiUsageException e) { + log.debug("loading LUA [{}]", connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); + if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha); + } + try { + connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("Slowly executing eval instead of fast evalsha"); + connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } + + } + } + } + + @Override + public void evict(TsLatestCacheKey key) { + log.trace("evict [{}]", key); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + connection.keyCommands().del(rawKey); + } + } + + @Override + public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { + log.trace("putIfAbsent [{}][{}]", key, value); + throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); + } + + @Override + public void evict(Collection keys) { + throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache"); + } + + @Override + public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) { + throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheTransaction newTransactionForKey(TsLatestCacheKey key) { + throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheTransaction newTransactionForKeys(List keys) { + throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache"); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java b/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java new file mode 100644 index 0000000000..a95f4b8c17 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java @@ -0,0 +1,90 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import lombok.extern.slf4j.Slf4j; +import org.junit.ClassRule; +import org.junit.rules.ExternalResource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.OutputFrame; +import redis.clients.jedis.Jedis; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class AbstractRedisClusterContainer { + + static final String nodes = "127.0.0.1:6371,127.0.0.1:6372,127.0.0.1:6373,127.0.0.1:6374,127.0.0.1:6375,127.0.0.1:6376"; + + @ClassRule(order = 0) + public static Network network = Network.newNetwork(); + @ClassRule(order = 1) + public static GenericContainer redis1 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6371").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 2) + public static GenericContainer redis2 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6372").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 3) + public static GenericContainer redis3 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6373").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 4) + public static GenericContainer redis4 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6374").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 5) + public static GenericContainer redis5 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6375").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 6) + public static GenericContainer redis6 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6376").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + + + @ClassRule(order = 100) + public static ExternalResource resource = new ExternalResource() { + @Override + protected void before() throws Throwable { + redis1.start(); + redis2.start(); + redis3.start(); + redis4.start(); + redis5.start(); + redis6.start(); + + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise not all containers have time to start + + String clusterCreateCommand = "echo yes | redis-cli --cluster create " + + "127.0.0.1:6371 127.0.0.1:6372 127.0.0.1:6373 127.0.0.1:6374 127.0.0.1:6375 127.0.0.1:6376 " + + "--cluster-replicas 1"; + log.warn("Command to init Redis Cluster: {}", clusterCreateCommand); + var result = redis6.execInContainer("/bin/sh", "-c", clusterCreateCommand); + log.warn("Init cluster result: {}", result); + + log.warn("Connect to nodes: {}", nodes); + System.setProperty("cache.type", "redis"); + System.setProperty("redis.connection.type", "cluster"); + System.setProperty("redis.cluster.nodes", nodes); + System.setProperty("redis.cluster.useDefaultPoolConfig", "false"); + } + + @Override + protected void after() { + redis1.stop(); + redis2.stop(); + redis3.stop(); + redis4.stop(); + redis5.stop(); + redis6.stop(); + List.of("cache.type", "redis.connection.type", "redis.cluster.nodes", "redis.cluster.useDefaultPoolConfig\"") + .forEach(System.getProperties()::remove); + } + }; + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java new file mode 100644 index 0000000000..be0bcc6fc7 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import org.junit.extensions.cpsuite.ClasspathSuite; +import org.junit.extensions.cpsuite.ClasspathSuite.ClassnameFilters; +import org.junit.runner.RunWith; + +@RunWith(ClasspathSuite.class) +@ClassnameFilters( + //All the same tests using redis instead of caffeine. + { + "org.thingsboard.server.dao.service.*ServiceSqlTest", + } +) +public class RedisClusterSqlTestSuite extends AbstractRedisClusterContainer { + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java b/dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java new file mode 100644 index 0000000000..43e788cccb --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +@Slf4j +public class RedisJUnit5Test { + + @Container + private static final GenericContainer REDIS = new GenericContainer("redis:7.2-bookworm") + .withLogConsumer(s -> log.error(((OutputFrame) s).getUtf8String().trim())) + .withExposedPorts(6379); + + @BeforeAll + static void beforeAll() { + log.warn("Starting redis..."); + REDIS.start(); + System.setProperty("cache.type", "redis"); + System.setProperty("redis.connection.type", "standalone"); + System.setProperty("redis.standalone.host", REDIS.getHost()); + System.setProperty("redis.standalone.port", String.valueOf(REDIS.getMappedPort(6379))); + + } + + @AfterAll + static void afterAll() { + List.of("cache.type", "redis.connection.type", "redis.standalone.host", "redis.standalone.port") + .forEach(System.getProperties()::remove); + REDIS.stop(); + log.warn("Redis is stopped"); + } + + @Test + void test() { + assertThat(REDIS.isRunning()).isTrue(); + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 390cb9afac..d32933684a 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.service.timeseries; import com.datastax.oss.driver.api.core.uuid.Uuids; import lombok.extern.slf4j.Slf4j; +import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; @@ -50,15 +52,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; /** * @author Andrew Shvayka @@ -89,6 +90,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE); protected TenantId tenantId; + DeviceId deviceId = new DeviceId(Uuids.timeBased()); @Before public void before() { @@ -106,8 +108,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindAllLatest() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); saveEntries(deviceId, TS); @@ -150,8 +150,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindLatest() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); saveEntries(deviceId, TS); @@ -162,9 +160,71 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { } @Test - public void testFindLatestWithoutLatestUpdate() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); + public void testFindLatestOpt_givenSaveWithHistoricalNonOrderedTS() throws Exception { + save(tenantId, deviceId, toTsEntry(TS - 1, stringKvEntry)); + save(tenantId, deviceId, toTsEntry(TS, stringKvEntry)); + save(tenantId, deviceId, toTsEntry(TS - 10, stringKvEntry)); + save(tenantId, deviceId, toTsEntry(TS - 11, stringKvEntry)); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.orElse(null)); + } + + @Test + public void testFindLatestOpt_givenSaveWithSameTSOverwriteValue() throws Exception { + save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry(STRING_KEY, "old"))); + save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry(STRING_KEY, "new"))); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.orElse(null)); + } + public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception { + save(tenantId, deviceId, toTsEntry(TS, new JsonDataEntry("temp", "{\"hello\":\"world\"}"))); + save(tenantId, deviceId, toTsEntry(TS, new BooleanDataEntry("temp", true))); + save(tenantId, deviceId, toTsEntry(TS, new LongDataEntry("temp", 100L))); + save(tenantId, deviceId, toTsEntry(TS, new DoubleDataEntry("temp", Math.PI))); + save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry("temp", "NOOP"))); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, new StringDataEntry("temp", "NOOP")), entryOpt.orElse(null)); + } + + @Test + public void testFindLatestOpt() throws Exception { + saveEntries(deviceId, TS - 2); + saveEntries(deviceId, TS - 1); + saveEntries(deviceId, TS); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.get()); + } + + @Test + public void testFindLatest_NotFound() throws Exception { + List entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entries).hasSize(1); + TsKvEntry tsKvEntry = entries.get(0); + assertThat(tsKvEntry).isNotNull(); + // null ts latest representation + assertThat(tsKvEntry.getKey()).isEqualTo(STRING_KEY); + assertThat(tsKvEntry.getDataType()).isEqualTo(DataType.STRING); + assertThat(tsKvEntry.getValue()).isNull(); + assertThat(tsKvEntry.getTs()).isCloseTo(System.currentTimeMillis(), Offset.offset(TimeUnit.MINUTES.toMillis(1))); + } + + @Test + public void testFindLatestOpt_NotFound() throws Exception { + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isNotPresent(); + } + + @Test + public void testFindLatestWithoutLatestUpdate() throws Exception { saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); saveEntriesWithoutLatest(deviceId, TS); @@ -176,8 +236,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQueryAscOrder() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 3); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); @@ -202,7 +260,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodEqualsOneMilisecondPeriod() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); saveEntries(deviceId, TS); saveEntries(deviceId, TS + 1L); @@ -222,7 +279,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodEqualsInterval() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); for (long i = TS; i <= TS + 100L; i += 10L) { saveEntries(deviceId, i); @@ -244,7 +300,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); for (long i = TS; i <= TS + 100000L; i += 10000L) { saveEntries(deviceId, i); @@ -268,7 +323,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirst() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); for (long i = TS; i <= TS + 80000L; i += 10000L) { saveEntries(deviceId, i); @@ -292,7 +346,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength_whereNotAllEntriesInRange() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (long i = TS - 1L; i <= TS + 100000L + 1L; i += 10000) { saveEntries(deviceId, i); } @@ -314,7 +367,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirst_andNotAllEntriesInRange() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (long i = TS - 1L; i <= TS + 100000L + 1L; i += 10000L) { saveEntries(deviceId, i); } @@ -336,8 +388,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQueryDescOrder() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 3); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); @@ -362,7 +412,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindAllByQueries_verifyQueryId() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 10); @@ -373,7 +422,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindAllByQueries_verifyQueryId_forEntityView() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 12); @@ -392,8 +440,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, 10000); saveEntries(deviceId, 20000); saveEntries(deviceId, 30000); @@ -412,7 +458,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindDeviceTsData() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); List entries = new ArrayList<>(); entries.add(save(deviceId, 5000, 100)); @@ -563,7 +608,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindDeviceLongAndDoubleTsData() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); List entries = new ArrayList<>(); entries.add(save(deviceId, 5000, 100)); @@ -654,8 +698,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testSaveTs_RemoveTs_AndSaveTsAgain() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - save(deviceId, 2000000L, 95); save(deviceId, 4000000L, 100); save(deviceId, 6000000L, 105); @@ -686,7 +728,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { BasicTsKvEntry jsonEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}")); List timeseries = List.of(booleanEntry, stringEntry, longEntry, doubleEntry, jsonEntry); - DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (TsKvEntry tsKvEntry : timeseries) { save(tenantId, deviceId, tsKvEntry); } diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java new file mode 100644 index 0000000000..c463b4630f --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +import java.security.MessageDigest; + +import static org.assertj.core.api.Assertions.assertThat; + +class TsLatestRedisCacheTest { + + @Test + void testUpsertTsLatestLUAScriptHash() { + assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA)); + } + + @SneakyThrows + String getSHA1(byte[] script) { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + byte[] hash = md.digest(script); + + StringBuilder sb = new StringBuilder(); + for (byte b : hash) { + sb.append(String.format("%02x", b)); + } + + return sb.toString(); + } + +} \ No newline at end of file diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index be8a69f689..0891123bc4 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -10,6 +10,7 @@ audit-log.sink.type=none #cache.type=caffeine # will be injected redis by RedisContainer or will be default (caffeine) cache.maximumPoolSize=16 cache.attributes.enabled=true +cache.ts_latest.enabled=true cache.specs.relations.timeToLiveInMinutes=1440 cache.specs.relations.maxSize=100000 @@ -59,6 +60,9 @@ cache.specs.assetProfiles.maxSize=100000 cache.specs.attributes.timeToLiveInMinutes=1440 cache.specs.attributes.maxSize=100000 +cache.specs.tsLatest.timeToLiveInMinutes=1440 +cache.specs.tsLatest.maxSize=100000 + cache.specs.tokensOutdatageTime.timeToLiveInMinutes=1440 cache.specs.tokensOutdatageTime.maxSize=100000 diff --git a/dao/src/test/resources/logback.xml b/dao/src/test/resources/logback-test.xml similarity index 65% rename from dao/src/test/resources/logback.xml rename to dao/src/test/resources/logback-test.xml index 4f7a2142df..fe0887c678 100644 --- a/dao/src/test/resources/logback.xml +++ b/dao/src/test/resources/logback-test.xml @@ -9,6 +9,11 @@ + + + + + diff --git a/pom.xml b/pom.xml index f75e782bef..6130dbd3aa 100755 --- a/pom.xml +++ b/pom.xml @@ -2162,6 +2162,18 @@ ${testcontainers-junit4-mock.version} test + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + junit + junit + + + org.zeroturnaround zt-exec