committed by
GitHub
104 changed files with 2537 additions and 1074 deletions
@ -0,0 +1,57 @@ |
|||
#!/bin/bash |
|||
# |
|||
# Copyright © 2016-2024 The Thingsboard Authors |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|||
# you may not use this file except in compliance with the License. |
|||
# You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, |
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
# See the License for the specific language governing permissions and |
|||
# limitations under the License. |
|||
# |
|||
|
|||
set -e # exit on any error |
|||
|
|||
#PROJECTS="msa/tb-node,msa/web-ui,rule-engine-pe/rule-node-twilio-sms" |
|||
PROJECTS="" |
|||
|
|||
if [ "$1" ]; then |
|||
PROJECTS="--projects $1" |
|||
fi |
|||
|
|||
echo "Building and pushing [amd64,arm64] projects '$PROJECTS' ..." |
|||
echo "HELP: usage ./build.sh [projects]" |
|||
echo "HELP: example ./build.sh msa/web-ui,msa/web-report" |
|||
java -version |
|||
#echo "Cleaning ui-ngx/node_modules" && rm -rf ui-ngx/node_modules |
|||
|
|||
MAVEN_OPTS="-Xmx1024m" NODE_OPTIONS="--max_old_space_size=4096" DOCKER_CLI_EXPERIMENTAL=enabled DOCKER_BUILDKIT=0 \ |
|||
mvn -T2 license:format clean install -DskipTests \ |
|||
$PROJECTS --also-make |
|||
# \ |
|||
# -Dpush-docker-amd-arm-images |
|||
# -Ddockerfile.skip=false -Dpush-docker-image=true |
|||
# --offline |
|||
# --projects '!msa/web-report' --also-make |
|||
|
|||
# push all |
|||
# mvn -T 1C license:format clean install -DskipTests -Ddockerfile.skip=false -Dpush-docker-image=true |
|||
|
|||
|
|||
## Build and push AMD and ARM docker images using docker buildx |
|||
## Reference to article how to setup docker miltiplatform build environment: https://medium.com/@artur.klauser/building-multi-architecture-docker-images-with-buildx-27d80f7e2408 |
|||
## install docker-ce from docker repo https://docs.docker.com/engine/install/ubuntu/ |
|||
# sudo apt install -y qemu-user-static binfmt-support |
|||
# export DOCKER_CLI_EXPERIMENTAL=enabled |
|||
# docker version |
|||
# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes |
|||
# docker buildx create --name mybuilder |
|||
# docker buildx use mybuilder |
|||
# docker buildx inspect --bootstrap |
|||
# docker buildx ls |
|||
# mvn clean install -P push-docker-amd-arm-images |
|||
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache; |
|||
|
|||
import org.springframework.data.redis.serializer.RedisSerializer; |
|||
import org.springframework.data.redis.serializer.SerializationException; |
|||
|
|||
public class TbJavaRedisSerializer<K, V> implements TbRedisSerializer<K, V> { |
|||
|
|||
final RedisSerializer<Object> serializer = RedisSerializer.java(); |
|||
|
|||
@Override |
|||
public byte[] serialize(V value) throws SerializationException { |
|||
return serializer.serialize(value); |
|||
} |
|||
|
|||
@Override |
|||
public V deserialize(K key, byte[] bytes) throws SerializationException { |
|||
return (V) serializer.deserialize(bytes); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,95 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache; |
|||
|
|||
import org.springframework.cache.Cache; |
|||
import org.springframework.cache.CacheManager; |
|||
import org.thingsboard.server.common.data.HasVersion; |
|||
import org.thingsboard.server.common.data.util.TbPair; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends Serializable & HasVersion> extends CaffeineTbTransactionalCache<K, V> implements VersionedTbCache<K, V> { |
|||
|
|||
public VersionedCaffeineTbCache(CacheManager cacheManager, String cacheName) { |
|||
super(cacheManager, cacheName); |
|||
} |
|||
|
|||
@Override |
|||
public TbCacheValueWrapper<V> get(K key) { |
|||
TbPair<Long, V> versionValuePair = doGet(key); |
|||
if (versionValuePair != null) { |
|||
return SimpleTbCacheValueWrapper.wrap(versionValuePair.getSecond()); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
@Override |
|||
public void put(K key, V value) { |
|||
Long version = value != null ? value.getVersion() : 0; |
|||
doPut(key, value, version); |
|||
} |
|||
|
|||
private void doPut(K key, V value, Long version) { |
|||
if (version == null) { |
|||
return; |
|||
} |
|||
lock.lock(); |
|||
try { |
|||
TbPair<Long, V> versionValuePair = doGet(key); |
|||
if (versionValuePair == null || version > versionValuePair.getFirst()) { |
|||
failAllTransactionsByKey(key); |
|||
cache.put(key, wrapValue(value, version)); |
|||
} |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
private TbPair<Long, V> doGet(K key) { |
|||
Cache.ValueWrapper source = cache.get(key); |
|||
return source == null ? null : (TbPair<Long, V>) source.get(); |
|||
} |
|||
|
|||
@Override |
|||
public void evict(K key) { |
|||
lock.lock(); |
|||
try { |
|||
failAllTransactionsByKey(key); |
|||
cache.evict(key); |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void evict(K key, Long version) { |
|||
if (version == null) { |
|||
return; |
|||
} |
|||
doPut(key, null, version); |
|||
} |
|||
|
|||
@Override |
|||
void doPutIfAbsent(K key, V value) { |
|||
cache.putIfAbsent(key, wrapValue(value, value != null ? value.getVersion() : 0)); |
|||
} |
|||
|
|||
private TbPair<Long, V> wrapValue(V value, Long version) { |
|||
return TbPair.of(version, value); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,181 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache; |
|||
|
|||
import jakarta.annotation.PostConstruct; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.NotImplementedException; |
|||
import org.springframework.dao.InvalidDataAccessApiUsageException; |
|||
import org.springframework.data.redis.connection.RedisConnection; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.data.redis.connection.ReturnType; |
|||
import org.springframework.data.redis.core.types.Expiration; |
|||
import org.springframework.data.redis.serializer.StringRedisSerializer; |
|||
import org.thingsboard.server.common.data.HasVersion; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.Arrays; |
|||
|
|||
@Slf4j |
|||
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> { |
|||
|
|||
private static final int VERSION_SIZE = 8; |
|||
private static final int VALUE_END_OFFSET = -1; |
|||
|
|||
static final byte[] SET_VERSIONED_VALUE_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize(""" |
|||
local key = KEYS[1] |
|||
local newValue = ARGV[1] |
|||
local newVersion = tonumber(ARGV[2]) |
|||
local expiration = tonumber(ARGV[3]) |
|||
|
|||
local function setNewValue() |
|||
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue |
|||
redis.call('SET', key, newValueWithVersion, 'EX', expiration) |
|||
end |
|||
|
|||
local function bytes_to_number(bytes) |
|||
local n = 0 |
|||
for i = 1, 8 do |
|||
n = n * 256 + string.byte(bytes, i) |
|||
end |
|||
return n |
|||
end |
|||
|
|||
-- Get the current version (first 8 bytes) of the current value |
|||
local currentVersionBytes = redis.call('GETRANGE', key, 0, 7) |
|||
|
|||
if currentVersionBytes and #currentVersionBytes == 8 then |
|||
local currentVersion = bytes_to_number(currentVersionBytes) |
|||
|
|||
if newVersion > currentVersion then |
|||
setNewValue() |
|||
end |
|||
else |
|||
-- If the current value is absent or the current version is not found, set the new value |
|||
setNewValue() |
|||
end |
|||
"""); |
|||
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("80e56cbbbb4bd9cb150d6537f1e7d8df4fddb252"); |
|||
|
|||
public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) { |
|||
super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); |
|||
} |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
try (var connection = getConnection(SET_VERSIONED_VALUE_SHA)) { |
|||
log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), connection.getNativeConnection()); |
|||
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT); |
|||
if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { |
|||
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), sha, connection.getNativeConnection()); |
|||
} |
|||
} catch (Throwable t) { |
|||
log.error("Error on Redis versioned cache init", t); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected byte[] doGet(RedisConnection connection, byte[] rawKey, boolean transactionMode) { |
|||
if (transactionMode) { |
|||
return super.doGet(connection, rawKey, true); |
|||
} |
|||
return connection.stringCommands().getRange(rawKey, VERSION_SIZE, VALUE_END_OFFSET); |
|||
} |
|||
|
|||
@Override |
|||
public void put(K key, V value) { |
|||
Long version = getVersion(value); |
|||
if (version == null) { |
|||
return; |
|||
} |
|||
doPut(key, value, version, cacheTtl); |
|||
} |
|||
|
|||
@Override |
|||
public void put(K key, V value, RedisConnection connection, boolean transactionMode) { |
|||
if (transactionMode) { |
|||
super.put(key, value, connection, true); // because scripting commands are not supported in transaction mode
|
|||
return; |
|||
} |
|||
Long version = getVersion(value); |
|||
if (version == null) { |
|||
return; |
|||
} |
|||
byte[] rawKey = getRawKey(key); |
|||
doPut(rawKey, value, version, cacheTtl, connection); |
|||
} |
|||
|
|||
private void doPut(K key, V value, Long version, Expiration expiration) { |
|||
if (!cacheEnabled) { |
|||
return; |
|||
} |
|||
log.trace("put [{}][{}][{}]", key, value, version); |
|||
final byte[] rawKey = getRawKey(key); |
|||
try (var connection = getConnection(rawKey)) { |
|||
doPut(rawKey, value, version, expiration, connection); |
|||
} |
|||
} |
|||
|
|||
private void doPut(byte[] rawKey, V value, Long version, Expiration expiration, RedisConnection connection) { |
|||
byte[] rawValue = getRawValue(value); |
|||
byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version)); |
|||
byte[] rawExpiration = StringRedisSerializer.UTF_8.serialize(String.valueOf(expiration.getExpirationTimeInSeconds())); |
|||
try { |
|||
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration); |
|||
} catch (InvalidDataAccessApiUsageException e) { |
|||
log.debug("loading LUA [{}]", connection.getNativeConnection()); |
|||
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT); |
|||
if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { |
|||
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha); |
|||
} |
|||
try { |
|||
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration); |
|||
} catch (InvalidDataAccessApiUsageException ignored) { |
|||
log.debug("Slowly executing eval instead of fast evalsha"); |
|||
connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration); |
|||
} |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void evict(K key, Long version) { |
|||
log.trace("evict [{}][{}]", key, version); |
|||
if (version != null) { |
|||
doPut(key, null, version, evictExpiration); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void putIfAbsent(K key, V value) { |
|||
throw new NotImplementedException("putIfAbsent is not supported by versioned cache"); |
|||
} |
|||
|
|||
@Override |
|||
public void evictOrPut(K key, V value) { |
|||
throw new NotImplementedException("evictOrPut is not supported by versioned cache"); |
|||
} |
|||
|
|||
private Long getVersion(V value) { |
|||
if (value == null) { |
|||
return 0L; |
|||
} else if (value.getVersion() != null) { |
|||
return value.getVersion(); |
|||
} else { |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,53 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache; |
|||
|
|||
import org.thingsboard.server.common.data.HasVersion; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.Collection; |
|||
import java.util.Optional; |
|||
import java.util.function.Supplier; |
|||
|
|||
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> { |
|||
|
|||
TbCacheValueWrapper<V> get(K key); |
|||
|
|||
default V get(K key, Supplier<V> supplier) { |
|||
return get(key, supplier, true); |
|||
} |
|||
|
|||
default V get(K key, Supplier<V> supplier, boolean putToCache) { |
|||
return Optional.ofNullable(get(key)) |
|||
.map(TbCacheValueWrapper::get) |
|||
.orElseGet(() -> { |
|||
V value = supplier.get(); |
|||
if (putToCache) { |
|||
put(key, value); |
|||
} |
|||
return value; |
|||
}); |
|||
} |
|||
|
|||
void put(K key, V value); |
|||
|
|||
void evict(K key); |
|||
|
|||
void evict(Collection<K> keys); |
|||
|
|||
void evict(K key, Long version); |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache; |
|||
|
|||
import lombok.SneakyThrows; |
|||
import org.junit.jupiter.api.Test; |
|||
|
|||
import java.security.MessageDigest; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
class VersionedRedisTbCacheTest { |
|||
|
|||
@Test |
|||
void testUpsertTsLatestLUAScriptHash() { |
|||
assertThat(getSHA1(VersionedRedisTbCache.SET_VERSIONED_VALUE_LUA_SCRIPT)).isEqualTo(new String(VersionedRedisTbCache.SET_VERSIONED_VALUE_SHA)); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
String getSHA1(byte[] script) { |
|||
MessageDigest md = MessageDigest.getInstance("SHA-1"); |
|||
byte[] hash = md.digest(script); |
|||
|
|||
StringBuilder sb = new StringBuilder(); |
|||
for (byte b : hash) { |
|||
sb.append(String.format("%02x", b)); |
|||
} |
|||
|
|||
return sb.toString(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.util; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
|
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
|
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@ConditionalOnExpression("('${database.ts_latest.type}'=='sql' || '${database.ts_latest.type}'=='timescale') && '${cache.ts_latest.enabled:false}'=='true' && '${cache.type:caffeine}'=='redis' ") |
|||
public @interface SqlTsLatestAnyDaoCachedRedis { |
|||
} |
|||
@ -0,0 +1,130 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao; |
|||
|
|||
import org.springframework.jdbc.core.BatchPreparedStatementSetter; |
|||
import org.springframework.jdbc.core.PreparedStatementCreator; |
|||
import org.springframework.jdbc.core.SqlProvider; |
|||
import org.springframework.jdbc.support.GeneratedKeyHolder; |
|||
import org.springframework.jdbc.support.KeyHolder; |
|||
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository; |
|||
|
|||
import java.sql.Connection; |
|||
import java.sql.PreparedStatement; |
|||
import java.sql.SQLException; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
|
|||
import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN; |
|||
|
|||
public abstract class AbstractVersionedInsertRepository<T> extends AbstractInsertRepository { |
|||
|
|||
public List<Long> saveOrUpdate(List<T> entities) { |
|||
return transactionTemplate.execute(status -> { |
|||
List<Long> seqNumbers = new ArrayList<>(entities.size()); |
|||
|
|||
KeyHolder keyHolder = new GeneratedKeyHolder(); |
|||
|
|||
int[] updateResult = onBatchUpdate(entities, keyHolder); |
|||
|
|||
List<Map<String, Object>> seqNumbersList = keyHolder.getKeyList(); |
|||
|
|||
int notUpdatedCount = entities.size() - seqNumbersList.size(); |
|||
|
|||
List<Integer> toInsertIndexes = new ArrayList<>(notUpdatedCount); |
|||
List<T> insertEntities = new ArrayList<>(notUpdatedCount); |
|||
int keyHolderIndex = 0; |
|||
for (int i = 0; i < updateResult.length; i++) { |
|||
if (updateResult[i] == 0) { |
|||
insertEntities.add(entities.get(i)); |
|||
seqNumbers.add(null); |
|||
toInsertIndexes.add(i); |
|||
} else { |
|||
seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(VERSION_COLUMN)); |
|||
keyHolderIndex++; |
|||
} |
|||
} |
|||
|
|||
if (insertEntities.isEmpty()) { |
|||
return seqNumbers; |
|||
} |
|||
|
|||
int[] insertResult = onInsertOrUpdate(insertEntities, keyHolder); |
|||
|
|||
seqNumbersList = keyHolder.getKeyList(); |
|||
|
|||
for (int i = 0; i < insertResult.length; i++) { |
|||
if (insertResult[i] != 0) { |
|||
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN)); |
|||
} |
|||
} |
|||
|
|||
return seqNumbers; |
|||
}); |
|||
} |
|||
|
|||
private int[] onBatchUpdate(List<T> entities, KeyHolder keyHolder) { |
|||
return jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getBatchUpdateQuery()), new BatchPreparedStatementSetter() { |
|||
@Override |
|||
public void setValues(PreparedStatement ps, int i) throws SQLException { |
|||
setOnBatchUpdateValues(ps, i, entities); |
|||
} |
|||
|
|||
@Override |
|||
public int getBatchSize() { |
|||
return entities.size(); |
|||
} |
|||
}, keyHolder); |
|||
} |
|||
|
|||
private int[] onInsertOrUpdate(List<T> insertEntities, KeyHolder keyHolder) { |
|||
return jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() { |
|||
@Override |
|||
public void setValues(PreparedStatement ps, int i) throws SQLException { |
|||
setOnInsertOrUpdateValues(ps, i, insertEntities); |
|||
} |
|||
|
|||
@Override |
|||
public int getBatchSize() { |
|||
return insertEntities.size(); |
|||
} |
|||
}, keyHolder); |
|||
} |
|||
|
|||
protected abstract void setOnBatchUpdateValues(PreparedStatement ps, int i, List<T> entities) throws SQLException; |
|||
|
|||
protected abstract void setOnInsertOrUpdateValues(PreparedStatement ps, int i, List<T> entities) throws SQLException; |
|||
|
|||
protected abstract String getBatchUpdateQuery(); |
|||
|
|||
protected abstract String getInsertOrUpdateQuery(); |
|||
|
|||
private record SequencePreparedStatementCreator(String sql) implements PreparedStatementCreator, SqlProvider { |
|||
|
|||
private static final String[] COLUMNS = {VERSION_COLUMN}; |
|||
|
|||
@Override |
|||
public PreparedStatement createPreparedStatement(Connection con) throws SQLException { |
|||
return con.prepareStatement(sql, COLUMNS); |
|||
} |
|||
|
|||
@Override |
|||
public String getSql() { |
|||
return this.sql; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,20 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.model; |
|||
|
|||
public interface BaseVersionedEntity { |
|||
long getVersion(); |
|||
} |
|||
@ -0,0 +1,170 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sqlts; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import jakarta.annotation.PostConstruct; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.annotation.Primary; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.cache.TbCacheValueWrapper; |
|||
import org.thingsboard.server.cache.VersionedTbCache; |
|||
import org.thingsboard.server.common.data.id.DeviceProfileId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; |
|||
import org.thingsboard.server.common.stats.DefaultCounter; |
|||
import org.thingsboard.server.common.stats.StatsFactory; |
|||
import org.thingsboard.server.dao.cache.CacheExecutorService; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; |
|||
import org.thingsboard.server.dao.timeseries.TsLatestCacheKey; |
|||
import org.thingsboard.server.dao.util.SqlTsLatestAnyDaoCachedRedis; |
|||
|
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@SqlTsLatestAnyDaoCachedRedis |
|||
@RequiredArgsConstructor |
|||
@Primary |
|||
public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { |
|||
public static final String STATS_NAME = "ts_latest.cache"; |
|||
final CacheExecutorService cacheExecutorService; |
|||
final SqlTimeseriesLatestDao sqlDao; |
|||
final StatsFactory statsFactory; |
|||
final VersionedTbCache<TsLatestCacheKey, TsKvEntry> cache; |
|||
DefaultCounter hitCounter; |
|||
DefaultCounter missCounter; |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
log.info("Init Redis cache-aside SQL Timeseries Latest DAO"); |
|||
this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit"); |
|||
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { |
|||
ListenableFuture<Long> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); |
|||
future = Futures.transform(future, version -> { |
|||
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), new BasicTsKvEntry(tsKvEntry.getTs(), ((BasicTsKvEntry) tsKvEntry).getKv(), version)); |
|||
return version; |
|||
}, |
|||
cacheExecutorService); |
|||
if (log.isTraceEnabled()) { |
|||
Futures.addCallback(future, new FutureCallback<>() { |
|||
@Override |
|||
public void onSuccess(Long result) { |
|||
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.info("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); |
|||
} |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
return future; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { |
|||
ListenableFuture<TsKvLatestRemovingResult> future = sqlDao.removeLatest(tenantId, entityId, query); |
|||
future = Futures.transform(future, x -> { |
|||
if (x.isRemoved()) { |
|||
TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey()); |
|||
Long version = x.getVersion(); |
|||
TsKvEntry newTsKvEntry = x.getData(); |
|||
if (newTsKvEntry != null) { |
|||
cache.put(key, new BasicTsKvEntry(newTsKvEntry.getTs(), ((BasicTsKvEntry) newTsKvEntry).getKv(), version)); |
|||
} else { |
|||
cache.evict(key, version); |
|||
} |
|||
} |
|||
return x; |
|||
}, |
|||
cacheExecutorService); |
|||
if (log.isTraceEnabled()) { |
|||
Futures.addCallback(future, new FutureCallback<>() { |
|||
@Override |
|||
public void onSuccess(TsKvLatestRemovingResult result) { |
|||
log.trace("removeLatest onSuccess [{}][{}][{}]", entityId, query.getKey(), query); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.info("removeLatest onFailure [{}][{}][{}]", entityId, query.getKey(), query, t); |
|||
} |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
return future; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { |
|||
log.trace("findLatestOpt"); |
|||
return doFindLatest(tenantId, entityId, key); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) { |
|||
return Futures.transform(doFindLatest(tenantId, entityId, key), x -> sqlDao.wrapNullTsKvEntry(key, x.orElse(null)), MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
public ListenableFuture<Optional<TsKvEntry>> doFindLatest(TenantId tenantId, EntityId entityId, String key) { |
|||
final TsLatestCacheKey cacheKey = new TsLatestCacheKey(entityId, key); |
|||
ListenableFuture<TbCacheValueWrapper<TsKvEntry>> cacheFuture = cacheExecutorService.submit(() -> cache.get(cacheKey)); |
|||
|
|||
return Futures.transformAsync(cacheFuture, (cacheValueWrap) -> { |
|||
if (cacheValueWrap != null) { |
|||
final TsKvEntry tsKvEntry = cacheValueWrap.get(); |
|||
log.debug("findLatest cache hit [{}][{}][{}]", entityId, key, tsKvEntry); |
|||
return Futures.immediateFuture(Optional.ofNullable(tsKvEntry)); |
|||
} |
|||
log.debug("findLatest cache miss [{}][{}]", entityId, key); |
|||
ListenableFuture<Optional<TsKvEntry>> daoFuture = sqlDao.findLatestOpt(tenantId, entityId, key); |
|||
|
|||
return Futures.transform(daoFuture, daoValue -> { |
|||
cache.put(cacheKey, daoValue.orElse(null)); |
|||
return daoValue; |
|||
}, MoreExecutors.directExecutor()); |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) { |
|||
return sqlDao.findAllLatest(tenantId, entityId); |
|||
} |
|||
|
|||
@Override |
|||
public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { |
|||
return sqlDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId); |
|||
} |
|||
|
|||
@Override |
|||
public List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds) { |
|||
return sqlDao.findAllKeysByEntityIds(tenantId, entityIds); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.timeseries; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
|
|||
import java.io.Serial; |
|||
import java.io.Serializable; |
|||
|
|||
@EqualsAndHashCode |
|||
@Getter |
|||
@AllArgsConstructor |
|||
public class TsLatestCacheKey implements Serializable { |
|||
private static final long serialVersionUID = 2024369077925351881L; |
|||
|
|||
private final EntityId entityId; |
|||
private final String key; |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return "{" + entityId + "}" + key; |
|||
} |
|||
} |
|||
@ -0,0 +1,55 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.timeseries; |
|||
|
|||
import com.google.protobuf.InvalidProtocolBufferException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.data.redis.serializer.SerializationException; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.cache.CacheSpecsMap; |
|||
import org.thingsboard.server.cache.TBRedisCacheConfiguration; |
|||
import org.thingsboard.server.cache.TbRedisSerializer; |
|||
import org.thingsboard.server.cache.VersionedRedisTbCache; |
|||
import org.thingsboard.server.common.data.CacheConstants; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.common.util.KvProtoUtil; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") |
|||
@Service("TsLatestCache") |
|||
@Slf4j |
|||
public class TsLatestRedisCache extends VersionedRedisTbCache<TsLatestCacheKey, TsKvEntry> { |
|||
|
|||
public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { |
|||
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { |
|||
@Override |
|||
public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException { |
|||
return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry, tsKvEntry.getVersion()).toByteArray(); |
|||
} |
|||
|
|||
@Override |
|||
public TsKvEntry deserialize(TsLatestCacheKey key, byte[] bytes) throws SerializationException { |
|||
try { |
|||
return KvProtoUtil.fromTsKvProto(TransportProtos.TsKvProto.parseFrom(bytes)); |
|||
} catch (InvalidProtocolBufferException e) { |
|||
throw new SerializationException(e.getMessage()); |
|||
} |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,91 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.ClassRule; |
|||
import org.junit.rules.ExternalResource; |
|||
import org.testcontainers.containers.GenericContainer; |
|||
import org.testcontainers.containers.Network; |
|||
import org.testcontainers.containers.output.OutputFrame; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Slf4j |
|||
public class AbstractRedisClusterContainer { |
|||
|
|||
static final String nodes = "127.0.0.1:6371,127.0.0.1:6372,127.0.0.1:6373,127.0.0.1:6374,127.0.0.1:6375,127.0.0.1:6376"; |
|||
|
|||
@ClassRule(order = 0) |
|||
public static Network network = Network.newNetwork(); |
|||
@ClassRule(order = 1) |
|||
public static GenericContainer redis1 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER", "6371").withNetworkMode("host").withLogConsumer(x -> log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD", "yes").withEnv("REDIS_NODES", nodes); |
|||
@ClassRule(order = 2) |
|||
public static GenericContainer redis2 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER", "6372").withNetworkMode("host").withLogConsumer(x -> log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD", "yes").withEnv("REDIS_NODES", nodes); |
|||
@ClassRule(order = 3) |
|||
public static GenericContainer redis3 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER", "6373").withNetworkMode("host").withLogConsumer(x -> log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD", "yes").withEnv("REDIS_NODES", nodes); |
|||
@ClassRule(order = 4) |
|||
public static GenericContainer redis4 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER", "6374").withNetworkMode("host").withLogConsumer(x -> log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD", "yes").withEnv("REDIS_NODES", nodes); |
|||
@ClassRule(order = 5) |
|||
public static GenericContainer redis5 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER", "6375").withNetworkMode("host").withLogConsumer(x -> log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD", "yes").withEnv("REDIS_NODES", nodes); |
|||
@ClassRule(order = 6) |
|||
public static GenericContainer redis6 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER", "6376").withNetworkMode("host").withLogConsumer(x -> log.warn("{}", ((OutputFrame) x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD", "yes").withEnv("REDIS_NODES", nodes); |
|||
|
|||
|
|||
@ClassRule(order = 100) |
|||
public static ExternalResource resource = new ExternalResource() { |
|||
@Override |
|||
protected void before() throws Throwable { |
|||
redis1.start(); |
|||
redis2.start(); |
|||
redis3.start(); |
|||
redis4.start(); |
|||
redis5.start(); |
|||
redis6.start(); |
|||
|
|||
Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise not all containers have time to start
|
|||
|
|||
String clusterCreateCommand = "echo yes | redis-cli --cluster create " + |
|||
"127.0.0.1:6371 127.0.0.1:6372 127.0.0.1:6373 127.0.0.1:6374 127.0.0.1:6375 127.0.0.1:6376 " + |
|||
"--cluster-replicas 1"; |
|||
log.warn("Command to init Redis Cluster: {}", clusterCreateCommand); |
|||
var result = redis6.execInContainer("/bin/sh", "-c", clusterCreateCommand); |
|||
log.warn("Init cluster result: {}", result); |
|||
|
|||
Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise cluster not always ready
|
|||
|
|||
log.warn("Connect to nodes: {}", nodes); |
|||
System.setProperty("cache.type", "redis"); |
|||
System.setProperty("redis.connection.type", "cluster"); |
|||
System.setProperty("redis.cluster.nodes", nodes); |
|||
System.setProperty("redis.cluster.useDefaultPoolConfig", "false"); |
|||
} |
|||
|
|||
@Override |
|||
protected void after() { |
|||
redis1.stop(); |
|||
redis2.stop(); |
|||
redis3.stop(); |
|||
redis4.stop(); |
|||
redis5.stop(); |
|||
redis6.stop(); |
|||
List.of("cache.type", "redis.connection.type", "redis.cluster.nodes", "redis.cluster.useDefaultPoolConfig\"") |
|||
.forEach(System.getProperties()::remove); |
|||
} |
|||
}; |
|||
|
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao; |
|||
|
|||
import org.junit.extensions.cpsuite.ClasspathSuite; |
|||
import org.junit.extensions.cpsuite.ClasspathSuite.ClassnameFilters; |
|||
import org.junit.runner.RunWith; |
|||
|
|||
@RunWith(ClasspathSuite.class) |
|||
@ClassnameFilters( |
|||
//All the same tests using redis instead of caffeine.
|
|||
{ |
|||
"org.thingsboard.server.dao.service.*ServiceSqlTest", |
|||
} |
|||
) |
|||
public class RedisClusterSqlTestSuite extends AbstractRedisClusterContainer { |
|||
|
|||
} |
|||
@ -0,0 +1,64 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.AfterAll; |
|||
import org.junit.jupiter.api.BeforeAll; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.testcontainers.containers.GenericContainer; |
|||
import org.testcontainers.containers.output.OutputFrame; |
|||
import org.testcontainers.junit.jupiter.Container; |
|||
import org.testcontainers.junit.jupiter.Testcontainers; |
|||
|
|||
import java.util.List; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
@Testcontainers |
|||
@Slf4j |
|||
public class RedisJUnit5Test { |
|||
|
|||
@Container |
|||
private static final GenericContainer REDIS = new GenericContainer("redis:7.2-bookworm") |
|||
.withLogConsumer(s -> log.error(((OutputFrame) s).getUtf8String().trim())) |
|||
.withExposedPorts(6379); |
|||
|
|||
@BeforeAll |
|||
static void beforeAll() { |
|||
log.warn("Starting redis..."); |
|||
REDIS.start(); |
|||
System.setProperty("cache.type", "redis"); |
|||
System.setProperty("redis.connection.type", "standalone"); |
|||
System.setProperty("redis.standalone.host", REDIS.getHost()); |
|||
System.setProperty("redis.standalone.port", String.valueOf(REDIS.getMappedPort(6379))); |
|||
|
|||
} |
|||
|
|||
@AfterAll |
|||
static void afterAll() { |
|||
List.of("cache.type", "redis.connection.type", "redis.standalone.host", "redis.standalone.port") |
|||
.forEach(System.getProperties()::remove); |
|||
REDIS.stop(); |
|||
log.warn("Redis is stopped"); |
|||
} |
|||
|
|||
@Test |
|||
void test() { |
|||
assertThat(REDIS.isRunning()).isTrue(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,114 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.service.attributes.sql; |
|||
|
|||
import org.junit.Test; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.thingsboard.server.cache.TbCacheValueWrapper; |
|||
import org.thingsboard.server.cache.VersionedTbCache; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.dao.attributes.AttributeCacheKey; |
|||
import org.thingsboard.server.dao.service.AbstractServiceTest; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
import static org.junit.Assert.assertEquals; |
|||
import static org.junit.Assert.assertNotNull; |
|||
import static org.junit.Assert.assertNull; |
|||
|
|||
@DaoSqlTest |
|||
public class AttributeCacheServiceSqlTest extends AbstractServiceTest { |
|||
|
|||
private static final String TEST_KEY = "key"; |
|||
private static final String TEST_VALUE = "value"; |
|||
private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID()); |
|||
|
|||
@Autowired |
|||
VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache; |
|||
|
|||
@Test |
|||
public void testPutAndGet() { |
|||
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY); |
|||
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L); |
|||
cache.put(testKey, testValue); |
|||
|
|||
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey); |
|||
assertNotNull(wrapper); |
|||
|
|||
assertEquals(testValue, wrapper.get()); |
|||
|
|||
AttributeKvEntry testValue2 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 2L); |
|||
cache.put(testKey, testValue2); |
|||
|
|||
wrapper = cache.get(testKey); |
|||
assertNotNull(wrapper); |
|||
|
|||
assertEquals(testValue2, wrapper.get()); |
|||
|
|||
AttributeKvEntry testValue3 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 0L); |
|||
cache.put(testKey, testValue3); |
|||
|
|||
wrapper = cache.get(testKey); |
|||
assertNotNull(wrapper); |
|||
|
|||
assertEquals(testValue2, wrapper.get()); |
|||
|
|||
cache.evict(testKey); |
|||
} |
|||
|
|||
@Test |
|||
public void testEvictWithVersion() { |
|||
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY); |
|||
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L); |
|||
cache.put(testKey, testValue); |
|||
|
|||
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey); |
|||
assertNotNull(wrapper); |
|||
|
|||
assertEquals(testValue, wrapper.get()); |
|||
|
|||
cache.evict(testKey, 2L); |
|||
|
|||
wrapper = cache.get(testKey); |
|||
assertNotNull(wrapper); |
|||
|
|||
assertNull(wrapper.get()); |
|||
|
|||
cache.evict(testKey); |
|||
} |
|||
|
|||
@Test |
|||
public void testEvict() { |
|||
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY); |
|||
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L); |
|||
cache.put(testKey, testValue); |
|||
|
|||
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey); |
|||
assertNotNull(wrapper); |
|||
|
|||
assertEquals(testValue, wrapper.get()); |
|||
|
|||
cache.evict(testKey); |
|||
|
|||
wrapper = cache.get(testKey); |
|||
assertNull(wrapper); |
|||
} |
|||
} |
|||
@ -0,0 +1,153 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.service.timeseries.sql; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.ListeningExecutorService; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.RandomStringUtils; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
|||
import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
|||
import org.thingsboard.server.common.data.kv.LongDataEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.dao.service.AbstractServiceTest; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Random; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
|
|||
@DaoSqlTest |
|||
@Slf4j |
|||
public class LatestTimeseriesPerformanceTest extends AbstractServiceTest { |
|||
|
|||
private static final String STRING_KEY = "stringKey"; |
|||
private static final String LONG_KEY = "longKey"; |
|||
private static final String DOUBLE_KEY = "doubleKey"; |
|||
private static final String BOOLEAN_KEY = "booleanKey"; |
|||
public static final int AMOUNT_OF_UNIQ_KEY = 10000; |
|||
|
|||
private final Random random = new Random(); |
|||
|
|||
@Autowired |
|||
private TimeseriesLatestDao timeseriesLatestDao; |
|||
|
|||
private ListeningExecutorService testExecutor; |
|||
|
|||
private EntityId entityId; |
|||
|
|||
private AtomicLong saveCounter; |
|||
|
|||
@Before |
|||
public void before() { |
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
Tenant savedTenant = tenantService.saveTenant(tenant); |
|||
Assert.assertNotNull(savedTenant); |
|||
tenantId = savedTenant.getId(); |
|||
entityId = new DeviceId(UUID.randomUUID()); |
|||
saveCounter = new AtomicLong(0); |
|||
testExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(200, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"))); |
|||
} |
|||
|
|||
@After |
|||
public void after() { |
|||
tenantService.deleteTenant(tenantId); |
|||
if (testExecutor != null) { |
|||
testExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void test_save_latest_timeseries() throws Exception { |
|||
warmup(); |
|||
saveCounter.set(0); |
|||
|
|||
long startTime = System.currentTimeMillis(); |
|||
List<ListenableFuture<?>> futures = new ArrayList<>(); |
|||
for (int i = 0; i < 25_000; i++) { |
|||
futures.add(save(generateStrEntry(getRandomKey()))); |
|||
futures.add(save(generateLngEntry(getRandomKey()))); |
|||
futures.add(save(generateDblEntry(getRandomKey()))); |
|||
futures.add(save(generateBoolEntry(getRandomKey()))); |
|||
} |
|||
Futures.allAsList(futures).get(60, TimeUnit.SECONDS); |
|||
long endTime = System.currentTimeMillis(); |
|||
|
|||
long totalTime = endTime - startTime; |
|||
|
|||
log.info("Total time: {}", totalTime); |
|||
log.info("Saved count: {}", saveCounter.get()); |
|||
log.warn("Saved per 1 sec: {}", saveCounter.get() * 1000 / totalTime); |
|||
} |
|||
|
|||
private void warmup() throws Exception { |
|||
List<ListenableFuture<?>> futures = new ArrayList<>(); |
|||
for (int i = 0; i < AMOUNT_OF_UNIQ_KEY; i++) { |
|||
futures.add(save(generateStrEntry(i))); |
|||
futures.add(save(generateLngEntry(i))); |
|||
futures.add(save(generateDblEntry(i))); |
|||
futures.add(save(generateBoolEntry(i))); |
|||
} |
|||
Futures.allAsList(futures).get(60, TimeUnit.SECONDS); |
|||
} |
|||
|
|||
private ListenableFuture<?> save(TsKvEntry tsKvEntry) { |
|||
return Futures.transformAsync(testExecutor.submit(() -> timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)), result -> { |
|||
saveCounter.incrementAndGet(); |
|||
return result; |
|||
}, testExecutor); |
|||
} |
|||
|
|||
private TsKvEntry generateStrEntry(int keyIndex) { |
|||
return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(STRING_KEY + keyIndex, RandomStringUtils.random(10))); |
|||
} |
|||
|
|||
private TsKvEntry generateLngEntry(int keyIndex) { |
|||
return new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(LONG_KEY + keyIndex, random.nextLong())); |
|||
} |
|||
|
|||
private TsKvEntry generateDblEntry(int keyIndex) { |
|||
return new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(DOUBLE_KEY + keyIndex, random.nextDouble())); |
|||
} |
|||
|
|||
private TsKvEntry generateBoolEntry(int keyIndex) { |
|||
return new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(BOOLEAN_KEY + keyIndex, random.nextBoolean())); |
|||
} |
|||
|
|||
private int getRandomKey() { |
|||
return random.nextInt(AMOUNT_OF_UNIQ_KEY); |
|||
} |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue