Browse Source

Fix multi-key cache transaction for attributes service

pull/6780/head
Andrii Shvaika 4 years ago
parent
commit
ebbf583392
  1. 14
      common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java
  2. 6
      common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java
  3. 2
      dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCacheKey.java

14
common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java

@ -23,6 +23,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.jedis.JedisClusterConnection;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@ -45,7 +46,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
@Getter
private final String cacheName;
private final RedisConnectionFactory connectionFactory;
private final JedisConnectionFactory connectionFactory;
private final RedisSerializer<String> keySerializer = new StringRedisSerializer();
private final RedisSerializer<V> valueSerializer;
private final Expiration evictExpiration;
@ -57,7 +58,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
TBRedisCacheConfiguration configuration,
RedisSerializer<V> valueSerializer) {
this.cacheName = cacheName;
this.connectionFactory = connectionFactory;
this.connectionFactory = (JedisConnectionFactory) connectionFactory;
this.valueSerializer = valueSerializer;
this.evictExpiration = Expiration.from(configuration.getEvictTtlInMs(), TimeUnit.MILLISECONDS);
this.cacheTtl = Optional.ofNullable(cacheSpecsMap)
@ -137,11 +138,11 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return new RedisTbCacheTransaction<>(this, connection);
}
RedisConnection getConnection(byte[] rawKey) {
RedisConnection connection = connectionFactory.getClusterConnection();
if (!(connection instanceof JedisClusterConnection)) {
return connection;
private RedisConnection getConnection(byte[] rawKey) {
if (!connectionFactory.isRedisClusterAware()) {
return connectionFactory.getConnection();
}
RedisConnection connection = connectionFactory.getClusterConnection();
int slotNum = JedisClusterCRC16.getSlot(rawKey);
Jedis jedis = ((JedisClusterConnection) connection).getNativeConnection().getConnectionFromSlot(slotNum);
@ -153,7 +154,6 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
}
private RedisConnection watch(byte[][] rawKeysList) {
//TODO process keys only on suitable slot connection, see getConnection(byte[] rawKey)
RedisConnection connection = getConnection(rawKeysList[0]);
try {
connection.watch(rawKeysList);

6
common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java

@ -39,6 +39,12 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
TbCacheTransaction<K, V> newTransactionForKey(K key);
/**
* Note that all keys should be in the same cache slot for redis. You may control the cache slot using '{}' bracers.
* See CLUSTER KEYSLOT command for more details.
* @param keys - list of keys to use
* @return transaction object
*/
TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys);
default V getAndPutInTransaction(K key, Supplier<V> dbCall, boolean cacheNullValue) {

2
dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCacheKey.java

@ -34,6 +34,6 @@ public class AttributeCacheKey implements Serializable {
@Override
public String toString() {
return entityId + "_" + scope + "_" + key;
return "{" + entityId + "}" + scope + "_" + key;
}
}

Loading…
Cancel
Save