diff --git a/common/cache/pom.xml b/common/cache/pom.xml index 47834cb6bb..b52984e0fc 100644 --- a/common/cache/pom.xml +++ b/common/cache/pom.xml @@ -56,6 +56,10 @@ com.github.ben-manes.caffeine caffeine + + javax.annotation + javax.annotation-api + org.apache.commons commons-lang3 diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCacheManager.java b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCacheManager.java deleted file mode 100644 index e9dba40cf1..0000000000 --- a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCacheManager.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.thingsboard.server.cache; - -import lombok.RequiredArgsConstructor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; -import org.springframework.stereotype.Service; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) -@Service -@RequiredArgsConstructor -public class CaffeineTbTransactionalCacheManager implements TbTransactionalCache { - - private final CacheManager cacheManager; - private final ConcurrentMap caches = new ConcurrentHashMap<>(); - - @Override - public Cache.ValueWrapper get(String cacheName, K key) { - return cacheManager.getCache(cacheName).get(key); - } - - @Override - public void putIfAbsent(String cacheName, K key, V value) { - getCache(cacheName).putIfAbsent(key, value); - } - - @Override - public void evict(String cacheName, K key) { - getCache(cacheName).evict(key); - } - - @Override - public TbCacheTransaction newTransactionForKey(String cacheName, K key) { - return getCache(cacheName).newTransaction(Collections.singletonList(key)); - } - - @Override - public TbCacheTransaction newTransactionForKeys(String cacheName, List keys) { - return getCache(cacheName).newTransaction(keys); - } - - private CaffeineCacheTransactionStorage getCache(String cacheName) { - return caches.computeIfAbsent(cacheName, cn -> new CaffeineCacheTransactionStorage(cacheName, this)); - } - - void doPutIfAbsent(String cacheName, Object key, Object value) { - cacheManager.getCache(cacheName).putIfAbsent(key, value); - } - - void doEvict(String cacheName, K key) { - cacheManager.getCache(cacheName).evict(key); - } -} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCacheManager.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCacheManager.java deleted file mode 100644 index a4df25aeac..0000000000 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCacheManager.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.thingsboard.server.cache; - -import lombok.RequiredArgsConstructor; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; -import org.springframework.stereotype.Service; - -import java.io.Serializable; -import java.util.List; - -@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") -@Service -@RequiredArgsConstructor -public class RedisTbTransactionalCacheManager implements TbTransactionalCache { - - private final CacheManager cacheManager; - - @Override - public Cache.ValueWrapper get(String cacheName, K key) { - return cacheManager.getCache(cacheName).get(key); - } - - @Override - public void putIfAbsent(String cacheName, K key, V value) { - } - - @Override - public void evict(String cacheName, K key) { - } - - @Override - public TbCacheTransaction newTransactionForKey(String cacheName, K key) { - return null; - } - - @Override - public TbCacheTransaction newTransactionForKeys(String cacheName, List keys) { - return null; - } - -} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheTransaction.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheTransaction.java index 22bbec4489..47779c251d 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheTransaction.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheTransaction.java @@ -1,12 +1,27 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.cache; import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.Executor; -public interface TbCacheTransaction { +public interface TbCacheTransaction { - void putIfAbsent(K key, V value); + void putIfAbsent(K key, V value); boolean commit(); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheValueWrapper.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheValueWrapper.java new file mode 100644 index 0000000000..66d7a64f15 --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbCacheValueWrapper.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache; + +public interface TbCacheValueWrapper { + + T get(); + +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java index ad370f3d18..208e5856ab 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java @@ -1,20 +1,35 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.cache; -import org.springframework.cache.Cache; - import java.io.Serializable; import java.util.List; -public interface TbTransactionalCache { +public interface TbTransactionalCache { + + String getCacheName(); - Cache.ValueWrapper get(String cacheName, K key); + TbCacheValueWrapper get(K key); - void putIfAbsent(String cacheName, K key, V value); + void putIfAbsent(K key, V value); - void evict(String cacheName, K key); + void evict(K key); - TbCacheTransaction newTransactionForKey(String cacheName, K key); + TbCacheTransaction newTransactionForKey(K key); - TbCacheTransaction newTransactionForKeys(String cacheName, List keys); + TbCacheTransaction newTransactionForKeys(List keys); } diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java index 3117e1fa43..f6ec28d170 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java @@ -21,7 +21,6 @@ import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.stereotype.Service; -import static org.thingsboard.server.common.data.CacheConstants.OTA_PACKAGE_CACHE; import static org.thingsboard.server.common.data.CacheConstants.OTA_PACKAGE_DATA_CACHE; @Service diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java new file mode 100644 index 0000000000..01dbde9998 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.dao.cache.CaffeineTbTransactionalCache; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) +@Service("AttributeCache") +public class AttributeCaffeineCache extends CaffeineTbTransactionalCache { + + public AttributeCaffeineCache(CacheManager cacheManager) { + super(cacheManager, CacheConstants.ATTRIBUTES_CACHE); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java new file mode 100644 index 0000000000..940c96eb00 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.dao.cache.RedisTbTransactionalCache; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") +@Service("AttributeCache") +public class AttributeRedisCache extends RedisTbTransactionalCache { + + public AttributeRedisCache(CacheManager cacheManager, RedisConnectionFactory connectionFactory) { + super(cacheManager, CacheConstants.ATTRIBUTES_CACHE, connectionFactory); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 508cf4c16a..a147159158 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index d9e172c113..919ff1f4cd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,6 +25,7 @@ import org.springframework.cache.Cache; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -34,7 +35,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.cache.CacheExecutorService; -import org.thingsboard.server.cache.TbCacheTransaction; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.dao.service.Validator; @@ -65,7 +65,7 @@ public class CachedAttributesService implements AttributesService { private final CacheExecutorService cacheExecutorService; private final DefaultCounter hitCounter; private final DefaultCounter missCounter; - private final TbTransactionalCache cache; + private final TbTransactionalCache cache; private Executor cacheExecutor; @Value("${cache.type}") @@ -74,7 +74,7 @@ public class CachedAttributesService implements AttributesService { public CachedAttributesService(AttributesDao attributesDao, StatsFactory statsFactory, CacheExecutorService cacheExecutorService, - TbTransactionalCache cache) { + TbTransactionalCache cache) { this.attributesDao = attributesDao; this.cacheExecutorService = cacheExecutorService; this.cache = cache; @@ -109,14 +109,14 @@ public class CachedAttributesService implements AttributesService { Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); - Cache.ValueWrapper cachedAttributeValue = cache.get(CacheConstants.ATTRIBUTES_CACHE, attributeCacheKey); + TbCacheValueWrapper cachedAttributeValue = cache.get(attributeCacheKey); if (cachedAttributeValue != null) { hitCounter.increment(); - AttributeKvEntry cachedAttributeKvEntry = (AttributeKvEntry) cachedAttributeValue.get(); + AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get(); return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry)); } else { missCounter.increment(); - TbCacheTransaction cacheTransaction = cache.newTransactionForKey(CacheConstants.ATTRIBUTES_CACHE, attributeCacheKey); + var cacheTransaction = cache.newTransactionForKey(attributeCacheKey); ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, attributeKey); cacheTransaction.rollBackOnFailure(result, cacheExecutor); return Futures.transform(result, foundAttrKvEntry -> { @@ -132,10 +132,10 @@ public class CachedAttributesService implements AttributesService { validate(entityId, scope); attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey)); - Map wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys); + Map> wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys); List cachedAttributes = wrappedCachedAttributes.values().stream() - .map(wrappedCachedAttribute -> (AttributeKvEntry) wrappedCachedAttribute.get()) + .map(TbCacheValueWrapper::get) .filter(Objects::nonNull) .collect(Collectors.toList()); if (wrappedCachedAttributes.size() == attributeKeys.size()) { @@ -147,7 +147,7 @@ public class CachedAttributesService implements AttributesService { List notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList()); - TbCacheTransaction cacheTransaction = cache.newTransactionForKeys(CacheConstants.ATTRIBUTES_CACHE, notFoundKeys); + var cacheTransaction = cache.newTransactionForKeys(notFoundKeys); ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); return Futures.transform(result, foundInDbAttributes -> { for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) { @@ -166,10 +166,10 @@ public class CachedAttributesService implements AttributesService { } - private Map findCachedAttributes(EntityId entityId, String scope, Collection attributeKeys) { - Map cachedAttributes = new HashMap<>(); + private Map> findCachedAttributes(EntityId entityId, String scope, Collection attributeKeys) { + Map> cachedAttributes = new HashMap<>(); for (String attributeKey : attributeKeys) { - Cache.ValueWrapper cachedAttributeValue = cache.get(CacheConstants.ATTRIBUTES_CACHE, new AttributeCacheKey(scope, entityId, attributeKey)); + var cachedAttributeValue = cache.get(new AttributeCacheKey(scope, entityId, attributeKey)); if (cachedAttributeValue != null) { hitCounter.increment(); cachedAttributes.put(attributeKey, cachedAttributeValue); @@ -205,7 +205,7 @@ public class CachedAttributesService implements AttributesService { for (var attribute : attributes) { ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); futures.add(Futures.transform(future, key -> { - cache.evict(CacheConstants.ATTRIBUTES_CACHE, new AttributeCacheKey(scope, entityId, key)); + cache.evict(new AttributeCacheKey(scope, entityId, key)); return key; }, cacheExecutor)); } @@ -218,7 +218,7 @@ public class CachedAttributesService implements AttributesService { validate(entityId, scope); List> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> { - cache.evict(CacheConstants.ATTRIBUTES_CACHE, new AttributeCacheKey(scope, entityId, key)); + cache.evict(new AttributeCacheKey(scope, entityId, key)); return key; }, cacheExecutor)).collect(Collectors.toList())); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheTransactionStorage.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheTransactionStorage.java new file mode 100644 index 0000000000..af3c40b333 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheTransactionStorage.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; + +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.cache.TbCacheTransaction; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@RequiredArgsConstructor +class CaffeineCacheTransactionStorage { + + + + + + +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbCacheTransaction.java similarity index 55% rename from common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java rename to dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbCacheTransaction.java index 202d9c4007..dc3ad0cf08 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbCacheTransaction.java @@ -1,4 +1,19 @@ -package org.thingsboard.server.cache; +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -8,7 +23,9 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; +import org.thingsboard.server.cache.TbCacheTransaction; +import java.io.Serializable; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -17,19 +34,20 @@ import java.util.concurrent.Executor; @Slf4j @RequiredArgsConstructor -public class CaffeineTbCacheTransaction implements TbCacheTransaction { +public class CaffeineTbCacheTransaction implements TbCacheTransaction { @Getter private final UUID id = UUID.randomUUID(); - private final CaffeineCacheTransactionStorage cache; + private final CaffeineTbTransactionalCache cache; @Getter - private final List keys; - @Getter @Setter + private final List keys; + @Getter + @Setter private boolean failed; private final Map pendingPuts = new LinkedHashMap<>(); @Override - public void putIfAbsent(K key, V value) { + public void putIfAbsent(K key, V value) { pendingPuts.put(key, value); } @@ -45,7 +63,7 @@ public class CaffeineTbCacheTransaction implements TbCacheTransaction { @Override public void rollBackOnFailure(ListenableFuture future, Executor executor) { - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable T result) { } diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineCacheTransactionStorage.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java similarity index 52% rename from common/cache/src/main/java/org/thingsboard/server/cache/CaffeineCacheTransactionStorage.java rename to dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java index 8a9a9850fc..563c41ef8a 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineCacheTransactionStorage.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java @@ -1,8 +1,29 @@ -package org.thingsboard.server.cache; +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; +import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.springframework.cache.CacheManager; +import org.thingsboard.server.cache.TbCacheTransaction; +import org.thingsboard.server.cache.TbCacheValueWrapper; +import org.thingsboard.server.cache.TbTransactionalCache; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -13,45 +34,71 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor -class CaffeineCacheTransactionStorage { +public abstract class CaffeineTbTransactionalCache implements TbTransactionalCache { + private final CacheManager cacheManager; + @Getter private final String cacheName; - private final CaffeineTbTransactionalCacheManager cache; + private final Lock lock = new ReentrantLock(); - private final Map> objectTransactions = new HashMap<>(); - private final Map transactions = new HashMap<>(); + private final Map> objectTransactions = new HashMap<>(); + private final Map> transactions = new HashMap<>(); + @Override + public TbCacheValueWrapper get(K key) { + return SimpleTbCacheValueWrapper.wrap(cacheManager.getCache(cacheName).get(key)); + } - TbCacheTransaction newTransaction(List keys) { + @Override + public void putIfAbsent(K key, V value) { lock.lock(); try { - var transaction = new CaffeineTbCacheTransaction(this, keys); - var transactionId = transaction.getId(); - for (K key : keys) { - objectTransactions.computeIfAbsent(key, k -> new HashSet<>()).add(transactionId); - } - transactions.put(transactionId, transaction); - return transaction; + failAllTransactionsByKey(key); + doPutIfAbsent(key, value); } finally { lock.unlock(); } } - void putIfAbsent(K key, V value) { + @Override + public void evict(K key) { lock.lock(); try { failAllTransactionsByKey(key); - cache.doPutIfAbsent(cacheName, key, value); + doEvict(key); } finally { lock.unlock(); } } - public void evict(K key) { + @Override + public TbCacheTransaction newTransactionForKey(K key) { + return newTransaction(Collections.singletonList(key)); + } + + @Override + public TbCacheTransaction newTransactionForKeys(List keys) { + return newTransaction(keys); + } + + void doPutIfAbsent(Object key, Object value) { + cacheManager.getCache(cacheName).putIfAbsent(key, value); + } + + void doEvict(K key) { + cacheManager.getCache(cacheName).evict(key); + } + + TbCacheTransaction newTransaction(List keys) { lock.lock(); try { - failAllTransactionsByKey(key); - cache.doEvict(cacheName, key); + var transaction = new CaffeineTbCacheTransaction<>(this, keys); + var transactionId = transaction.getId(); + for (K key : keys) { + objectTransactions.computeIfAbsent(key, k -> new HashSet<>()).add(transactionId); + } + transactions.put(transactionId, transaction); + return transaction; } finally { lock.unlock(); } @@ -63,7 +110,7 @@ class CaffeineCacheTransactionStorage { var tr = transactions.get(trId); var success = !tr.isFailed(); if (success) { - for (Object key : tr.getKeys()) { + for (K key : tr.getKeys()) { Set otherTransactions = objectTransactions.get(key); if (otherTransactions != null) { for (UUID otherTrId : otherTransactions) { @@ -73,7 +120,7 @@ class CaffeineCacheTransactionStorage { } } } - pendingPuts.forEach((k, v) -> cache.doPutIfAbsent(cacheName, k, v)); + pendingPuts.forEach(this::doPutIfAbsent); } removeTransaction(trId); return success; @@ -92,7 +139,7 @@ class CaffeineCacheTransactionStorage { } private void removeTransaction(UUID id) { - CaffeineTbCacheTransaction transaction = transactions.remove(id); + CaffeineTbCacheTransaction transaction = transactions.remove(id); if (transaction != null) { for (var key : transaction.getKeys()) { Set transactions = objectTransactions.get(key); @@ -106,7 +153,7 @@ class CaffeineCacheTransactionStorage { } } - private void failAllTransactionsByKey(K key) { + private void failAllTransactionsByKey(K key) { Set transactionsIds = objectTransactions.get(key); if (transactionsIds != null) { for (UUID otherTrId : transactionsIds) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java b/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java new file mode 100644 index 0000000000..dcb16b42e8 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.springframework.cache.CacheManager; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.thingsboard.server.cache.TbCacheTransaction; +import org.thingsboard.server.cache.TbCacheValueWrapper; +import org.thingsboard.server.cache.TbTransactionalCache; + +import java.io.Serializable; +import java.util.List; + +@RequiredArgsConstructor +public abstract class RedisTbTransactionalCache implements TbTransactionalCache { + + private final CacheManager cacheManager; + @Getter + private final String cacheName; + private final RedisConnectionFactory connectionFactory; + + @Override + public TbCacheValueWrapper get(K key) { + return null; + } + + @Override + public void putIfAbsent(K key, V value) { + + } + + @Override + public void evict(K key) { + + } + + @Override + public TbCacheTransaction newTransactionForKey(K key) { + return null; + } + + @Override + public TbCacheTransaction newTransactionForKeys(List keys) { + return null; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/SimpleTbCacheValueWrapper.java b/dao/src/main/java/org/thingsboard/server/dao/cache/SimpleTbCacheValueWrapper.java new file mode 100644 index 0000000000..8e08582477 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/SimpleTbCacheValueWrapper.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.cache; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import org.springframework.cache.Cache; +import org.thingsboard.server.cache.TbCacheValueWrapper; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class SimpleTbCacheValueWrapper implements TbCacheValueWrapper { + + private final T value; + + @Override + public T get() { + return value; + } + + @SuppressWarnings("unchecked") + public static SimpleTbCacheValueWrapper wrap(Cache.ValueWrapper source) { + return source == null ? null : new SimpleTbCacheValueWrapper<>((T) source.get()); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index e2e1ba7d2b..36ab0d9f85 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java index c1ee7f3fd3..115def0492 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index 185dae1c2f..0c5fbd5d3a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/AttributeServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/AttributeServiceTest.java index 02b9ef54d6..c67122994c 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/AttributeServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/AttributeServiceTest.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.dao.sql.attributes; import com.google.common.util.concurrent.Futures; diff --git a/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/RedisAttributeServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/RedisAttributeServiceTest.java index 09a0ccf9d4..ce0afb8f60 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/RedisAttributeServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sql/attributes/RedisAttributeServiceTest.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.dao.sql.attributes; import lombok.extern.slf4j.Slf4j;