diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
index d4e29f02de..ef0e580e34 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
@@ -19,10 +19,12 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
@@ -34,6 +36,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import org.thingsboard.server.dao.service.Validator;
+import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -43,6 +46,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@@ -53,12 +57,17 @@ import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@Slf4j
public class CachedAttributesService implements AttributesService {
private static final String STATS_NAME = "attributes.cache";
+ public static final String LOCAL_CACHE_TYPE = "caffeine";
private final AttributesDao attributesDao;
private final AttributesCacheWrapper cacheWrapper;
+ private final CacheExecutorService cacheExecutorService;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
- private final CacheExecutorService cacheExecutorService;
+ private Executor cacheExecutor;
+
+ @Value("${cache.type}")
+ private String cacheType;
public CachedAttributesService(AttributesDao attributesDao,
AttributesCacheWrapper cacheWrapper,
@@ -72,6 +81,25 @@ public class CachedAttributesService implements AttributesService {
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss");
}
+ @PostConstruct
+ public void init() {
+ this.cacheExecutor = getExecutor(cacheType, cacheExecutorService);
+ }
+
+ /**
+ * Will return:
+ * - for the local cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread)
+ * - for the remote cache: dedicated thread pool for the cache IO calls to unblock any caller thread
+ * */
+ Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) {
+ if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) {
+ log.info("Going to use directExecutor for the local cache type {}", cacheType);
+ return MoreExecutors.directExecutor();
+ }
+ log.info("Going to use cacheExecutorService for the remote cache type {}", cacheType);
+ return cacheExecutorService;
+ }
+
@Override
public ListenableFuture> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) {
validate(entityId, scope);
@@ -90,7 +118,7 @@ public class CachedAttributesService implements AttributesService {
// TODO: think if it's a good idea to store 'empty' attributes
cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null));
return foundAttrKvEntry;
- }, cacheExecutorService);
+ }, cacheExecutor);
}
}
@@ -113,7 +141,7 @@ public class CachedAttributesService implements AttributesService {
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
- return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService);
+ return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutor);
}
@@ -171,7 +199,7 @@ public class CachedAttributesService implements AttributesService {
// TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
List attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
- future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
+ future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor);
return future;
}
@@ -179,7 +207,7 @@ public class CachedAttributesService implements AttributesService {
public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) {
validate(entityId, scope);
ListenableFuture> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
- future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
+ future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor);
return future;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java
index ca00e41d79..9f0f54bb46 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java
@@ -1,32 +1,17 @@
/**
- * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
+ * Copyright © 2016-2021 The Thingsboard Authors
*
- * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * NOTICE: All information contained herein is, and remains
- * the property of ThingsBoard, Inc. and its suppliers,
- * if any. The intellectual and technical concepts contained
- * herein are proprietary to ThingsBoard, Inc.
- * and its suppliers and may be covered by U.S. and Foreign Patents,
- * patents in process, and are protected by trade secret or copyright law.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Dissemination of this information or reproduction of this material is strictly forbidden
- * unless prior written permission is obtained from COMPANY.
- *
- * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
- * managers or contractors who have executed Confidentiality and Non-disclosure agreements
- * explicitly covering such access.
- *
- * The copyright notice above does not evidence any actual or intended publication
- * or disclosure of this source code, which includes
- * information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
- * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
- * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
- * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
- * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
- * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
- * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
- * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.thingsboard.server.dao.cache;
diff --git a/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java
index af980a6301..6a3429e5fb 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java
@@ -1,38 +1,22 @@
/**
- * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
+ * Copyright © 2016-2021 The Thingsboard Authors
*
- * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * NOTICE: All information contained herein is, and remains
- * the property of ThingsBoard, Inc. and its suppliers,
- * if any. The intellectual and technical concepts contained
- * herein are proprietary to ThingsBoard, Inc.
- * and its suppliers and may be covered by U.S. and Foreign Patents,
- * patents in process, and are protected by trade secret or copyright law.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Dissemination of this information or reproduction of this material is strictly forbidden
- * unless prior written permission is obtained from COMPANY.
- *
- * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
- * managers or contractors who have executed Confidentiality and Non-disclosure agreements
- * explicitly covering such access.
- *
- * The copyright notice above does not evidence any actual or intended publication
- * or disclosure of this source code, which includes
- * information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
- * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
- * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
- * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
- * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
- * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
- * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
- * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
-import org.thingsboard.server.dao.cache.CacheConfiguration;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import static org.hamcrest.CoreMatchers.is;
@@ -58,15 +42,11 @@ public class CachedAttributesServiceTest {
assertThat(cachedAttributesService.getExecutor(null, cacheExecutorService), is(MoreExecutors.directExecutor()));
- CacheConfiguration cacheConfiguration = new CacheConfiguration();
- cacheConfiguration.setType(null);
- assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor()));
+ assertThat(cachedAttributesService.getExecutor((String) null, cacheExecutorService), is(MoreExecutors.directExecutor()));
- cacheConfiguration.setType("");
- assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor()));
+ assertThat(cachedAttributesService.getExecutor("", cacheExecutorService), is(MoreExecutors.directExecutor()));
- cacheConfiguration.setType(CachedAttributesService.LOCAL_CACHE_TYPE);
- assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor()));
+ assertThat(cachedAttributesService.getExecutor(CachedAttributesService.LOCAL_CACHE_TYPE, cacheExecutorService), is(MoreExecutors.directExecutor()));
}
@@ -74,14 +54,11 @@ public class CachedAttributesServiceTest {
public void givenCacheType_whenGetExecutor_thenReturnCacheExecutorService() {
CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class);
CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class);
- willCallRealMethod().given(cachedAttributesService).getExecutor(any(CacheConfiguration.class), any(CacheExecutorService.class));
+ willCallRealMethod().given(cachedAttributesService).getExecutor(any(String.class), any(CacheExecutorService.class));
- CacheConfiguration cacheConfiguration = new CacheConfiguration();
- cacheConfiguration.setType(REDIS);
- assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService));
+ assertThat(cachedAttributesService.getExecutor(REDIS, cacheExecutorService), is(cacheExecutorService));
- cacheConfiguration.setType("unknownCacheType");
- assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService));
+ assertThat(cachedAttributesService.getExecutor("unknownCacheType", cacheExecutorService), is(cacheExecutorService));
}