diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 933c0d38f4..9efa16d911 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -322,6 +322,8 @@ actors: cache: # caffeine or redis type: "${CACHE_TYPE:caffeine}" + attributes: + enabled: "${CACHE_ATTRIBUTES_ENABLED:true}" caffeine: specs: @@ -355,6 +357,9 @@ caffeine: deviceProfiles: timeToLiveInMinutes: 1440 maxSize: 0 + attributes: + timeToLiveInMinutes: 1440 + maxSize: 100000 redis: # standalone or cluster diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index 8088652588..c3490aa85f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -26,4 +26,5 @@ public class CacheConstants { public static final String SECURITY_SETTINGS_CACHE = "securitySettings"; public static final String TENANT_PROFILE_CACHE = "tenantProfiles"; public static final String DEVICE_PROFILE_CACHE = "deviceProfiles"; + public static final String ATTRIBUTES_CACHE = "attributes"; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCacheKey.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCacheKey.java new file mode 100644 index 0000000000..05a7c55898 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCacheKey.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.thingsboard.server.common.data.id.EntityId; + +@EqualsAndHashCode +@Getter +@AllArgsConstructor +public class AttributeCacheKey { + private final String scope; + private final EntityId entityId; + private final String key; + + @Override + public String toString() { + return entityId + "_" + scope + "_" + key; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java new file mode 100644 index 0000000000..b1fc82fe88 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeUtils.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.dao.exception.IncorrectParameterException; +import org.thingsboard.server.dao.service.Validator; + +public class AttributeUtils { + public static void validate(EntityId id, String scope) { + Validator.validateId(id.getId(), "Incorrect id " + id); + Validator.validateString(scope, "Incorrect scope " + scope); + } + + public static void validate(AttributeKvEntry kvEntry) { + if (kvEntry == null) { + throw new IncorrectParameterException("Key value entry can't be null"); + } else if (kvEntry.getDataType() == null) { + throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null"); + } else { + Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty"); + Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive"); + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index e70604353a..3eee300627 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -15,31 +15,39 @@ */ package org.thingsboard.server.dao.attributes; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.service.Validator; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; + +import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; /** * @author Andrew Shvayka */ @Service +@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "false", matchIfMissing = true) +@Primary +@Slf4j public class BaseAttributesService implements AttributesService { + private final AttributesDao attributesDao; - @Autowired - private AttributesDao attributesDao; + public BaseAttributesService(AttributesDao attributesDao) { + this.attributesDao = attributesDao; + } @Override public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) { @@ -75,33 +83,14 @@ public class BaseAttributesService implements AttributesService { public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { validate(entityId, scope); attributes.forEach(attribute -> validate(attribute)); - List> futures = Lists.newArrayListWithExpectedSize(attributes.size()); - for (AttributeKvEntry attribute : attributes) { - futures.add(attributesDao.save(tenantId, entityId, scope, attribute)); - } - return Futures.allAsList(futures); + + List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); + return Futures.allAsList(saveFutures); } @Override - public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List keys) { + public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); - return attributesDao.removeAll(tenantId, entityId, scope, keys); - } - - private static void validate(EntityId id, String scope) { - Validator.validateId(id.getId(), "Incorrect id " + id); - Validator.validateString(scope, "Incorrect scope " + scope); - } - - private static void validate(AttributeKvEntry kvEntry) { - if (kvEntry == null) { - throw new IncorrectParameterException("Key value entry can't be null"); - } else if (kvEntry.getDataType() == null) { - throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null"); - } else { - Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty"); - Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive"); - } + return attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java new file mode 100644 index 0000000000..22e9a95bd6 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -0,0 +1,194 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.stats.DefaultCounter; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.dao.service.Validator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE; +import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; + +@Service +@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true") +@Primary +@Slf4j +public class CachedAttributesService implements AttributesService { + private static final String STATS_NAME = "attributes.cache"; + + private final AttributesDao attributesDao; + private final Cache attributesCache; + + private final DefaultCounter hitCounter; + private final DefaultCounter missCounter; + + public CachedAttributesService(AttributesDao attributesDao, + CacheManager cacheManager, + StatsFactory statsFactory) { + this.attributesDao = attributesDao; + this.attributesCache = cacheManager.getCache(ATTRIBUTES_CACHE); + + this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit"); + this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); + } + + @Override + public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) { + validate(entityId, scope); + Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); + + AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); + Cache.ValueWrapper cachedAttributeValue = attributesCache.get(attributeCacheKey); + if (cachedAttributeValue != null) { + hitCounter.increment(); + AttributeKvEntry cachedAttributeKvEntry = (AttributeKvEntry) cachedAttributeValue.get(); + return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry)); + } else { + missCounter.increment(); + ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, attributeKey); + return Futures.transform(result, foundAttrKvEntry -> { + // TODO: think if it's a good idea to store 'empty' attributes + attributesCache.put(attributeKey, foundAttrKvEntry.orElse(null)); + return foundAttrKvEntry; + }, MoreExecutors.directExecutor()); + } + } + + @Override + public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, Collection attributeKeys) { + validate(entityId, scope); + attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey)); + + Map wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys); + + List cachedAttributes = wrappedCachedAttributes.values().stream() + .map(wrappedCachedAttribute -> (AttributeKvEntry) wrappedCachedAttribute.get()) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (wrappedCachedAttributes.size() == attributeKeys.size()) { + return Futures.immediateFuture(cachedAttributes); + } + + ArrayList notFoundAttributeKeys = new ArrayList<>(attributeKeys); + notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); + + ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); + return Futures.transform(result, foundInDbAttributes -> { + return mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes); + }, MoreExecutors.directExecutor()); + + } + + private Map findCachedAttributes(EntityId entityId, String scope, Collection attributeKeys) { + Map cachedAttributes = new HashMap<>(); + for (String attributeKey : attributeKeys) { + Cache.ValueWrapper cachedAttributeValue = attributesCache.get(new AttributeCacheKey(scope, entityId, attributeKey)); + if (cachedAttributeValue != null) { + hitCounter.increment(); + cachedAttributes.put(attributeKey, cachedAttributeValue); + } else { + missCounter.increment(); + } + } + return cachedAttributes; + } + + private List mergeDbAndCacheAttributes(EntityId entityId, String scope, List cachedAttributes, ArrayList notFoundAttributeKeys, List foundInDbAttributes) { + for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) { + AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey()); + attributesCache.put(attributeCacheKey, foundInDbAttribute); + notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); + } + for (String key : notFoundAttributeKeys){ + attributesCache.put(new AttributeCacheKey(scope, entityId, key), null); + } + List mergedAttributes = new ArrayList<>(cachedAttributes); + mergedAttributes.addAll(foundInDbAttributes); + return mergedAttributes; + } + + @Override + public ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String scope) { + validate(entityId, scope); + return attributesDao.findAll(tenantId, entityId, scope); + } + + @Override + public List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { + return attributesDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId); + } + + @Override + public List findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List entityIds) { + return attributesDao.findAllKeysByEntityIds(tenantId, entityType, entityIds); + } + + @Override + public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { + validate(entityId, scope); + attributes.forEach(AttributeUtils::validate); + + List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); + ListenableFuture> future = Futures.allAsList(saveFutures); + + // TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache + List attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor()); + return future; + } + + @Override + public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { + validate(entityId, scope); + ListenableFuture> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor()); + return future; + } + + private void evictAttributesFromCache(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { + try { + for (String attributeKey : attributeKeys) { + attributesCache.evict(new AttributeCacheKey(scope, entityId, attributeKey)); + } + } catch (Exception e) { + log.error("[{}][{}] Failed to remove values from cache.", tenantId, entityId, e); + } + } +}