|
|
|
@ -19,10 +19,12 @@ import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
import com.google.common.util.concurrent.MoreExecutors; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|
|
|
import org.springframework.cache.Cache; |
|
|
|
import org.springframework.context.annotation.Primary; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceProfileId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
@ -34,6 +36,7 @@ import org.thingsboard.server.common.stats.StatsFactory; |
|
|
|
import org.thingsboard.server.dao.cache.CacheExecutorService; |
|
|
|
import org.thingsboard.server.dao.service.Validator; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Collection; |
|
|
|
import java.util.HashMap; |
|
|
|
@ -43,6 +46,7 @@ import java.util.Map; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.concurrent.Executor; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; |
|
|
|
@ -53,12 +57,17 @@ import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; |
|
|
|
@Slf4j |
|
|
|
public class CachedAttributesService implements AttributesService { |
|
|
|
private static final String STATS_NAME = "attributes.cache"; |
|
|
|
public static final String LOCAL_CACHE_TYPE = "caffeine"; |
|
|
|
|
|
|
|
private final AttributesDao attributesDao; |
|
|
|
private final AttributesCacheWrapper cacheWrapper; |
|
|
|
private final CacheExecutorService cacheExecutorService; |
|
|
|
private final DefaultCounter hitCounter; |
|
|
|
private final DefaultCounter missCounter; |
|
|
|
private final CacheExecutorService cacheExecutorService; |
|
|
|
private Executor cacheExecutor; |
|
|
|
|
|
|
|
@Value("${cache.type}") |
|
|
|
private String cacheType; |
|
|
|
|
|
|
|
public CachedAttributesService(AttributesDao attributesDao, |
|
|
|
AttributesCacheWrapper cacheWrapper, |
|
|
|
@ -72,6 +81,25 @@ public class CachedAttributesService implements AttributesService { |
|
|
|
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); |
|
|
|
} |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
this.cacheExecutor = getExecutor(cacheType, cacheExecutorService); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Will return: |
|
|
|
* - for the <b>local</b> cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread) |
|
|
|
* - for the <b>remote</b> cache: dedicated thread pool for the cache IO calls to unblock any caller thread |
|
|
|
* */ |
|
|
|
Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) { |
|
|
|
if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) { |
|
|
|
log.info("Going to use directExecutor for the local cache type {}", cacheType); |
|
|
|
return MoreExecutors.directExecutor(); |
|
|
|
} |
|
|
|
log.info("Going to use cacheExecutorService for the remote cache type {}", cacheType); |
|
|
|
return cacheExecutorService; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) { |
|
|
|
validate(entityId, scope); |
|
|
|
@ -90,7 +118,7 @@ public class CachedAttributesService implements AttributesService { |
|
|
|
// TODO: think if it's a good idea to store 'empty' attributes
|
|
|
|
cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null)); |
|
|
|
return foundAttrKvEntry; |
|
|
|
}, cacheExecutorService); |
|
|
|
}, cacheExecutor); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -113,7 +141,7 @@ public class CachedAttributesService implements AttributesService { |
|
|
|
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); |
|
|
|
|
|
|
|
ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); |
|
|
|
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService); |
|
|
|
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutor); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
@ -171,7 +199,7 @@ public class CachedAttributesService implements AttributesService { |
|
|
|
|
|
|
|
// TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
|
|
|
|
List<String> attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); |
|
|
|
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); |
|
|
|
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
@ -179,7 +207,7 @@ public class CachedAttributesService implements AttributesService { |
|
|
|
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) { |
|
|
|
validate(entityId, scope); |
|
|
|
ListenableFuture<List<Void>> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); |
|
|
|
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); |
|
|
|
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor); |
|
|
|
return future; |
|
|
|
} |
|
|
|
|
|
|
|
|