From e1c1e7ebbcb2d801b645488af4e474e3e4af08a6 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 30 Jun 2021 15:30:52 +0300 Subject: [PATCH] cache: CachedAttributesService fixed after merge to the latest master, fixed license headers --- .../attributes/CachedAttributesService.java | 38 +++++++++++-- .../dao/cache/CacheExecutorService.java | 35 ++++-------- .../CachedAttributesServiceTest.java | 55 ++++++------------- 3 files changed, 59 insertions(+), 69 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index d4e29f02de..ef0e580e34 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -19,10 +19,12 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.Cache; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; @@ -34,6 +36,7 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.cache.CacheExecutorService; import org.thingsboard.server.dao.service.Validator; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -43,6 +46,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @@ -53,12 +57,17 @@ import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @Slf4j public class CachedAttributesService implements AttributesService { private static final String STATS_NAME = "attributes.cache"; + public static final String LOCAL_CACHE_TYPE = "caffeine"; private final AttributesDao attributesDao; private final AttributesCacheWrapper cacheWrapper; + private final CacheExecutorService cacheExecutorService; private final DefaultCounter hitCounter; private final DefaultCounter missCounter; - private final CacheExecutorService cacheExecutorService; + private Executor cacheExecutor; + + @Value("${cache.type}") + private String cacheType; public CachedAttributesService(AttributesDao attributesDao, AttributesCacheWrapper cacheWrapper, @@ -72,6 +81,25 @@ public class CachedAttributesService implements AttributesService { this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); } + @PostConstruct + public void init() { + this.cacheExecutor = getExecutor(cacheType, cacheExecutorService); + } + + /** + * Will return: + * - for the local cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread) + * - for the remote cache: dedicated thread pool for the cache IO calls to unblock any caller thread + * */ + Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) { + if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) { + log.info("Going to use directExecutor for the local cache type {}", cacheType); + return MoreExecutors.directExecutor(); + } + log.info("Going to use cacheExecutorService for the remote cache type {}", cacheType); + return cacheExecutorService; + } + @Override public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) { validate(entityId, scope); @@ -90,7 +118,7 @@ public class CachedAttributesService implements AttributesService { // TODO: think if it's a good idea to store 'empty' attributes cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null)); return foundAttrKvEntry; - }, cacheExecutorService); + }, cacheExecutor); } } @@ -113,7 +141,7 @@ public class CachedAttributesService implements AttributesService { notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); - return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService); + return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutor); } @@ -171,7 +199,7 @@ public class CachedAttributesService implements AttributesService { // TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache List attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor); return future; } @@ -179,7 +207,7 @@ public class CachedAttributesService implements AttributesService { public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); ListenableFuture> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor); return future; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java index ca00e41d79..9f0f54bb46 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java @@ -1,32 +1,17 @@ /** - * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * Copyright © 2016-2021 The Thingsboard Authors * - * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * NOTICE: All information contained herein is, and remains - * the property of ThingsBoard, Inc. and its suppliers, - * if any. The intellectual and technical concepts contained - * herein are proprietary to ThingsBoard, Inc. - * and its suppliers and may be covered by U.S. and Foreign Patents, - * patents in process, and are protected by trade secret or copyright law. + * http://www.apache.org/licenses/LICENSE-2.0 * - * Dissemination of this information or reproduction of this material is strictly forbidden - * unless prior written permission is obtained from COMPANY. - * - * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, - * managers or contractors who have executed Confidentiality and Non-disclosure agreements - * explicitly covering such access. - * - * The copyright notice above does not evidence any actual or intended publication - * or disclosure of this source code, which includes - * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. - * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, - * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT - * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, - * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. - * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION - * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, - * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.thingsboard.server.dao.cache; diff --git a/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java index af980a6301..6a3429e5fb 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java @@ -1,38 +1,22 @@ /** - * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * Copyright © 2016-2021 The Thingsboard Authors * - * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * NOTICE: All information contained herein is, and remains - * the property of ThingsBoard, Inc. and its suppliers, - * if any. The intellectual and technical concepts contained - * herein are proprietary to ThingsBoard, Inc. - * and its suppliers and may be covered by U.S. and Foreign Patents, - * patents in process, and are protected by trade secret or copyright law. + * http://www.apache.org/licenses/LICENSE-2.0 * - * Dissemination of this information or reproduction of this material is strictly forbidden - * unless prior written permission is obtained from COMPANY. - * - * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, - * managers or contractors who have executed Confidentiality and Non-disclosure agreements - * explicitly covering such access. - * - * The copyright notice above does not evidence any actual or intended publication - * or disclosure of this source code, which includes - * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. - * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, - * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT - * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, - * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. - * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION - * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, - * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.thingsboard.server.dao.attributes; import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; -import org.thingsboard.server.dao.cache.CacheConfiguration; import org.thingsboard.server.dao.cache.CacheExecutorService; import static org.hamcrest.CoreMatchers.is; @@ -58,15 +42,11 @@ public class CachedAttributesServiceTest { assertThat(cachedAttributesService.getExecutor(null, cacheExecutorService), is(MoreExecutors.directExecutor())); - CacheConfiguration cacheConfiguration = new CacheConfiguration(); - cacheConfiguration.setType(null); - assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor())); + assertThat(cachedAttributesService.getExecutor((String) null, cacheExecutorService), is(MoreExecutors.directExecutor())); - cacheConfiguration.setType(""); - assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor())); + assertThat(cachedAttributesService.getExecutor("", cacheExecutorService), is(MoreExecutors.directExecutor())); - cacheConfiguration.setType(CachedAttributesService.LOCAL_CACHE_TYPE); - assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor())); + assertThat(cachedAttributesService.getExecutor(CachedAttributesService.LOCAL_CACHE_TYPE, cacheExecutorService), is(MoreExecutors.directExecutor())); } @@ -74,14 +54,11 @@ public class CachedAttributesServiceTest { public void givenCacheType_whenGetExecutor_thenReturnCacheExecutorService() { CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class); CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class); - willCallRealMethod().given(cachedAttributesService).getExecutor(any(CacheConfiguration.class), any(CacheExecutorService.class)); + willCallRealMethod().given(cachedAttributesService).getExecutor(any(String.class), any(CacheExecutorService.class)); - CacheConfiguration cacheConfiguration = new CacheConfiguration(); - cacheConfiguration.setType(REDIS); - assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService)); + assertThat(cachedAttributesService.getExecutor(REDIS, cacheExecutorService), is(cacheExecutorService)); - cacheConfiguration.setType("unknownCacheType"); - assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService)); + assertThat(cachedAttributesService.getExecutor("unknownCacheType", cacheExecutorService), is(cacheExecutorService)); }