Browse Source

Transactional Redis Cache interface

pull/6536/head
Andrii Shvaika 4 years ago
parent
commit
7eaa70e472
  1. 4
      common/cache/pom.xml
  2. 59
      common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCacheManager.java
  3. 42
      common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCacheManager.java
  4. 19
      common/cache/src/main/java/org/thingsboard/server/cache/TbCacheTransaction.java
  5. 22
      common/cache/src/main/java/org/thingsboard/server/cache/TbCacheValueWrapper.java
  6. 31
      common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java
  7. 1
      common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java
  8. 33
      dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java
  9. 33
      dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java
  10. 2
      dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
  11. 30
      dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
  12. 39
      dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheTransactionStorage.java
  13. 32
      dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbCacheTransaction.java
  14. 91
      dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java
  15. 61
      dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java
  16. 37
      dao/src/main/java/org/thingsboard/server/dao/cache/SimpleTbCacheValueWrapper.java
  17. 2
      dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
  18. 2
      dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java
  19. 2
      dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
  20. 15
      dao/src/test/java/org/thingsboard/server/dao/sql/attributes/AttributeServiceTest.java
  21. 15
      dao/src/test/java/org/thingsboard/server/dao/sql/attributes/RedisAttributeServiceTest.java

4
common/cache/pom.xml

@ -56,6 +56,10 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

59
common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCacheManager.java

@ -1,59 +0,0 @@
package org.thingsboard.server.cache;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
@Service
@RequiredArgsConstructor
public class CaffeineTbTransactionalCacheManager implements TbTransactionalCache {
private final CacheManager cacheManager;
private final ConcurrentMap<String, CaffeineCacheTransactionStorage> caches = new ConcurrentHashMap<>();
@Override
public <K extends Serializable> Cache.ValueWrapper get(String cacheName, K key) {
return cacheManager.getCache(cacheName).get(key);
}
@Override
public <K extends Serializable, V extends Serializable> void putIfAbsent(String cacheName, K key, V value) {
getCache(cacheName).putIfAbsent(key, value);
}
@Override
public <K extends Serializable> void evict(String cacheName, K key) {
getCache(cacheName).evict(key);
}
@Override
public <K extends Serializable> TbCacheTransaction newTransactionForKey(String cacheName, K key) {
return getCache(cacheName).newTransaction(Collections.singletonList(key));
}
@Override
public <K extends Serializable> TbCacheTransaction newTransactionForKeys(String cacheName, List<K> keys) {
return getCache(cacheName).newTransaction(keys);
}
private CaffeineCacheTransactionStorage getCache(String cacheName) {
return caches.computeIfAbsent(cacheName, cn -> new CaffeineCacheTransactionStorage(cacheName, this));
}
<K extends Serializable, V extends Serializable> void doPutIfAbsent(String cacheName, Object key, Object value) {
cacheManager.getCache(cacheName).putIfAbsent(key, value);
}
<K extends Serializable> void doEvict(String cacheName, K key) {
cacheManager.getCache(cacheName).evict(key);
}
}

42
common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCacheManager.java

@ -1,42 +0,0 @@
package org.thingsboard.server.cache;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.util.List;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service
@RequiredArgsConstructor
public class RedisTbTransactionalCacheManager implements TbTransactionalCache {
private final CacheManager cacheManager;
@Override
public <K extends Serializable> Cache.ValueWrapper get(String cacheName, K key) {
return cacheManager.getCache(cacheName).get(key);
}
@Override
public <K extends Serializable, V extends Serializable> void putIfAbsent(String cacheName, K key, V value) {
}
@Override
public <K extends Serializable> void evict(String cacheName, K key) {
}
@Override
public <K extends Serializable> TbCacheTransaction newTransactionForKey(String cacheName, K key) {
return null;
}
@Override
public <K extends Serializable> TbCacheTransaction newTransactionForKeys(String cacheName, List<K> keys) {
return null;
}
}

19
common/cache/src/main/java/org/thingsboard/server/cache/TbCacheTransaction.java

@ -1,12 +1,27 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Executor;
public interface TbCacheTransaction {
public interface TbCacheTransaction<K, V> {
<K,V> void putIfAbsent(K key, V value);
void putIfAbsent(K key, V value);
boolean commit();

22
common/cache/src/main/java/org/thingsboard/server/cache/TbCacheValueWrapper.java

@ -0,0 +1,22 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
public interface TbCacheValueWrapper<T> {
T get();
}

31
common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java

@ -1,20 +1,35 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import org.springframework.cache.Cache;
import java.io.Serializable;
import java.util.List;
public interface TbTransactionalCache {
public interface TbTransactionalCache<K extends Serializable, V extends Serializable> {
String getCacheName();
<K extends Serializable> Cache.ValueWrapper get(String cacheName, K key);
TbCacheValueWrapper<V> get(K key);
<K extends Serializable, V extends Serializable> void putIfAbsent(String cacheName, K key, V value);
void putIfAbsent(K key, V value);
<K extends Serializable> void evict(String cacheName, K key);
void evict(K key);
<K extends Serializable> TbCacheTransaction newTransactionForKey(String cacheName, K key);
TbCacheTransaction<K, V> newTransactionForKey(K key);
<K extends Serializable> TbCacheTransaction newTransactionForKeys(String cacheName, List<K> keys);
TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys);
}

1
common/cache/src/main/java/org/thingsboard/server/cache/ota/RedisOtaPackageDataCache.java

@ -21,7 +21,6 @@ import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.stereotype.Service;
import static org.thingsboard.server.common.data.CacheConstants.OTA_PACKAGE_CACHE;
import static org.thingsboard.server.common.data.CacheConstants.OTA_PACKAGE_DATA_CACHE;
@Service

33
dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.cache.CaffeineTbTransactionalCache;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
@Service("AttributeCache")
public class AttributeCaffeineCache extends CaffeineTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeCaffeineCache(CacheManager cacheManager) {
super(cacheManager, CacheConstants.ATTRIBUTES_CACHE);
}
}

33
dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.cache.RedisTbTransactionalCache;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("AttributeCache")
public class AttributeRedisCache extends RedisTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeRedisCache(CacheManager cacheManager, RedisConnectionFactory connectionFactory) {
super(cacheManager, CacheConstants.ATTRIBUTES_CACHE, connectionFactory);
}
}

2
dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

30
dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -25,6 +25,7 @@ import org.springframework.cache.Cache;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceProfileId;
@ -34,7 +35,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import org.thingsboard.server.cache.TbCacheTransaction;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.dao.service.Validator;
@ -65,7 +65,7 @@ public class CachedAttributesService implements AttributesService {
private final CacheExecutorService cacheExecutorService;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
private final TbTransactionalCache cache;
private final TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
private Executor cacheExecutor;
@Value("${cache.type}")
@ -74,7 +74,7 @@ public class CachedAttributesService implements AttributesService {
public CachedAttributesService(AttributesDao attributesDao,
StatsFactory statsFactory,
CacheExecutorService cacheExecutorService,
TbTransactionalCache cache) {
TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache) {
this.attributesDao = attributesDao;
this.cacheExecutorService = cacheExecutorService;
this.cache = cache;
@ -109,14 +109,14 @@ public class CachedAttributesService implements AttributesService {
Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey);
Cache.ValueWrapper cachedAttributeValue = cache.get(CacheConstants.ATTRIBUTES_CACHE, attributeCacheKey);
TbCacheValueWrapper<AttributeKvEntry> cachedAttributeValue = cache.get(attributeCacheKey);
if (cachedAttributeValue != null) {
hitCounter.increment();
AttributeKvEntry cachedAttributeKvEntry = (AttributeKvEntry) cachedAttributeValue.get();
AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get();
return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry));
} else {
missCounter.increment();
TbCacheTransaction cacheTransaction = cache.newTransactionForKey(CacheConstants.ATTRIBUTES_CACHE, attributeCacheKey);
var cacheTransaction = cache.newTransactionForKey(attributeCacheKey);
ListenableFuture<Optional<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
cacheTransaction.rollBackOnFailure(result, cacheExecutor);
return Futures.transform(result, foundAttrKvEntry -> {
@ -132,10 +132,10 @@ public class CachedAttributesService implements AttributesService {
validate(entityId, scope);
attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
Map<String, Cache.ValueWrapper> wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys);
Map<String, TbCacheValueWrapper<AttributeKvEntry>> wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys);
List<AttributeKvEntry> cachedAttributes = wrappedCachedAttributes.values().stream()
.map(wrappedCachedAttribute -> (AttributeKvEntry) wrappedCachedAttribute.get())
.map(TbCacheValueWrapper::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (wrappedCachedAttributes.size() == attributeKeys.size()) {
@ -147,7 +147,7 @@ public class CachedAttributesService implements AttributesService {
List<AttributeCacheKey> notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList());
TbCacheTransaction cacheTransaction = cache.newTransactionForKeys(CacheConstants.ATTRIBUTES_CACHE, notFoundKeys);
var cacheTransaction = cache.newTransactionForKeys(notFoundKeys);
ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
return Futures.transform(result, foundInDbAttributes -> {
for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) {
@ -166,10 +166,10 @@ public class CachedAttributesService implements AttributesService {
}
private Map<String, Cache.ValueWrapper> findCachedAttributes(EntityId entityId, String scope, Collection<String> attributeKeys) {
Map<String, Cache.ValueWrapper> cachedAttributes = new HashMap<>();
private Map<String, TbCacheValueWrapper<AttributeKvEntry>> findCachedAttributes(EntityId entityId, String scope, Collection<String> attributeKeys) {
Map<String, TbCacheValueWrapper<AttributeKvEntry>> cachedAttributes = new HashMap<>();
for (String attributeKey : attributeKeys) {
Cache.ValueWrapper cachedAttributeValue = cache.get(CacheConstants.ATTRIBUTES_CACHE, new AttributeCacheKey(scope, entityId, attributeKey));
var cachedAttributeValue = cache.get(new AttributeCacheKey(scope, entityId, attributeKey));
if (cachedAttributeValue != null) {
hitCounter.increment();
cachedAttributes.put(attributeKey, cachedAttributeValue);
@ -205,7 +205,7 @@ public class CachedAttributesService implements AttributesService {
for (var attribute : attributes) {
ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute);
futures.add(Futures.transform(future, key -> {
cache.evict(CacheConstants.ATTRIBUTES_CACHE, new AttributeCacheKey(scope, entityId, key));
cache.evict(new AttributeCacheKey(scope, entityId, key));
return key;
}, cacheExecutor));
}
@ -218,7 +218,7 @@ public class CachedAttributesService implements AttributesService {
validate(entityId, scope);
List<ListenableFuture<String>> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> {
cache.evict(CacheConstants.ATTRIBUTES_CACHE, new AttributeCacheKey(scope, entityId, key));
cache.evict(new AttributeCacheKey(scope, entityId, key));
return key;
}, cacheExecutor)).collect(Collectors.toList()));
}

39
dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineCacheTransactionStorage.java

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.cache.TbCacheTransaction;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor
class CaffeineCacheTransactionStorage<K extends Serializable, V extends Serializable> {
}

32
common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java → dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbCacheTransaction.java

@ -1,4 +1,19 @@
package org.thingsboard.server.cache;
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -8,7 +23,9 @@ import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.server.cache.TbCacheTransaction;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -17,19 +34,20 @@ import java.util.concurrent.Executor;
@Slf4j
@RequiredArgsConstructor
public class CaffeineTbCacheTransaction implements TbCacheTransaction {
public class CaffeineTbCacheTransaction<K extends Serializable, V extends Serializable> implements TbCacheTransaction<K, V> {
@Getter
private final UUID id = UUID.randomUUID();
private final CaffeineCacheTransactionStorage cache;
private final CaffeineTbTransactionalCache<K, V> cache;
@Getter
private final List<?> keys;
@Getter @Setter
private final List<K> keys;
@Getter
@Setter
private boolean failed;
private final Map<Object, Object> pendingPuts = new LinkedHashMap<>();
@Override
public <K, V> void putIfAbsent(K key, V value) {
public void putIfAbsent(K key, V value) {
pendingPuts.put(key, value);
}
@ -45,7 +63,7 @@ public class CaffeineTbCacheTransaction implements TbCacheTransaction {
@Override
public <T> void rollBackOnFailure(ListenableFuture<T> future, Executor executor) {
Futures.addCallback(future, new FutureCallback<T>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T result) {
}

91
common/cache/src/main/java/org/thingsboard/server/cache/CaffeineCacheTransactionStorage.java → dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java

@ -1,8 +1,29 @@
package org.thingsboard.server.cache;
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.CacheManager;
import org.thingsboard.server.cache.TbCacheTransaction;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.TbTransactionalCache;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -13,45 +34,71 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor
class CaffeineCacheTransactionStorage {
public abstract class CaffeineTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> {
private final CacheManager cacheManager;
@Getter
private final String cacheName;
private final CaffeineTbTransactionalCacheManager cache;
private final Lock lock = new ReentrantLock();
private final Map<Object, Set<UUID>> objectTransactions = new HashMap<>();
private final Map<UUID, CaffeineTbCacheTransaction> transactions = new HashMap<>();
private final Map<K, Set<UUID>> objectTransactions = new HashMap<>();
private final Map<UUID, CaffeineTbCacheTransaction<K, V>> transactions = new HashMap<>();
@Override
public TbCacheValueWrapper<V> get(K key) {
return SimpleTbCacheValueWrapper.wrap(cacheManager.getCache(cacheName).get(key));
}
<K extends Serializable> TbCacheTransaction newTransaction(List<K> keys) {
@Override
public void putIfAbsent(K key, V value) {
lock.lock();
try {
var transaction = new CaffeineTbCacheTransaction(this, keys);
var transactionId = transaction.getId();
for (K key : keys) {
objectTransactions.computeIfAbsent(key, k -> new HashSet<>()).add(transactionId);
}
transactions.put(transactionId, transaction);
return transaction;
failAllTransactionsByKey(key);
doPutIfAbsent(key, value);
} finally {
lock.unlock();
}
}
<K extends Serializable, V extends Serializable> void putIfAbsent(K key, V value) {
@Override
public void evict(K key) {
lock.lock();
try {
failAllTransactionsByKey(key);
cache.doPutIfAbsent(cacheName, key, value);
doEvict(key);
} finally {
lock.unlock();
}
}
public <K extends Serializable> void evict(K key) {
@Override
public TbCacheTransaction<K, V> newTransactionForKey(K key) {
return newTransaction(Collections.singletonList(key));
}
@Override
public TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys) {
return newTransaction(keys);
}
void doPutIfAbsent(Object key, Object value) {
cacheManager.getCache(cacheName).putIfAbsent(key, value);
}
void doEvict(K key) {
cacheManager.getCache(cacheName).evict(key);
}
TbCacheTransaction<K, V> newTransaction(List<K> keys) {
lock.lock();
try {
failAllTransactionsByKey(key);
cache.doEvict(cacheName, key);
var transaction = new CaffeineTbCacheTransaction<>(this, keys);
var transactionId = transaction.getId();
for (K key : keys) {
objectTransactions.computeIfAbsent(key, k -> new HashSet<>()).add(transactionId);
}
transactions.put(transactionId, transaction);
return transaction;
} finally {
lock.unlock();
}
@ -63,7 +110,7 @@ class CaffeineCacheTransactionStorage {
var tr = transactions.get(trId);
var success = !tr.isFailed();
if (success) {
for (Object key : tr.getKeys()) {
for (K key : tr.getKeys()) {
Set<UUID> otherTransactions = objectTransactions.get(key);
if (otherTransactions != null) {
for (UUID otherTrId : otherTransactions) {
@ -73,7 +120,7 @@ class CaffeineCacheTransactionStorage {
}
}
}
pendingPuts.forEach((k, v) -> cache.doPutIfAbsent(cacheName, k, v));
pendingPuts.forEach(this::doPutIfAbsent);
}
removeTransaction(trId);
return success;
@ -92,7 +139,7 @@ class CaffeineCacheTransactionStorage {
}
private void removeTransaction(UUID id) {
CaffeineTbCacheTransaction transaction = transactions.remove(id);
CaffeineTbCacheTransaction<K, V> transaction = transactions.remove(id);
if (transaction != null) {
for (var key : transaction.getKeys()) {
Set<UUID> transactions = objectTransactions.get(key);
@ -106,7 +153,7 @@ class CaffeineCacheTransactionStorage {
}
}
private <K extends Serializable> void failAllTransactionsByKey(K key) {
private void failAllTransactionsByKey(K key) {
Set<UUID> transactionsIds = objectTransactions.get(key);
if (transactionsIds != null) {
for (UUID otherTrId : transactionsIds) {

61
dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.thingsboard.server.cache.TbCacheTransaction;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.TbTransactionalCache;
import java.io.Serializable;
import java.util.List;
@RequiredArgsConstructor
public abstract class RedisTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> {
private final CacheManager cacheManager;
@Getter
private final String cacheName;
private final RedisConnectionFactory connectionFactory;
@Override
public TbCacheValueWrapper<V> get(K key) {
return null;
}
@Override
public void putIfAbsent(K key, V value) {
}
@Override
public void evict(K key) {
}
@Override
public TbCacheTransaction<K, V> newTransactionForKey(K key) {
return null;
}
@Override
public TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys) {
return null;
}
}

37
dao/src/main/java/org/thingsboard/server/dao/cache/SimpleTbCacheValueWrapper.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache;
import org.thingsboard.server.cache.TbCacheValueWrapper;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class SimpleTbCacheValueWrapper<T> implements TbCacheValueWrapper<T> {
private final T value;
@Override
public T get() {
return value;
}
@SuppressWarnings("unchecked")
public static <T> SimpleTbCacheValueWrapper<T> wrap(Cache.ValueWrapper source) {
return source == null ? null : new SimpleTbCacheValueWrapper<>((T) source.get());
}
}

2
dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

2
dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

2
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

15
dao/src/test/java/org/thingsboard/server/dao/sql/attributes/AttributeServiceTest.java

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.attributes;
import com.google.common.util.concurrent.Futures;

15
dao/src/test/java/org/thingsboard/server/dao/sql/attributes/RedisAttributeServiceTest.java

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.attributes;
import lombok.extern.slf4j.Slf4j;

Loading…
Cancel
Save