diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java index c3444d43f7..751bde6303 100644 --- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java +++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java @@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; import springfox.documentation.swagger2.annotations.EnableSwagger2; import java.util.Arrays; @@ -26,6 +27,7 @@ import java.util.Arrays; @SpringBootConfiguration @EnableAsync @EnableSwagger2 +@EnableScheduling @ComponentScan({"org.thingsboard.server"}) public class ThingsboardServerApplication { diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java index 09aaf80dee..f6bf54d349 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java @@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.PluginMetaData; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; +import org.thingsboard.server.common.msg.session.MsgType; import org.thingsboard.server.extensions.api.plugins.Plugin; import org.thingsboard.server.extensions.api.plugins.PluginInitializationException; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg; import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; @@ -98,7 +103,20 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException { if (state == ComponentLifecycleState.ACTIVE) { - pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); + try { + pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg()); + } catch (Exception ex) { + logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex); + RuleToPluginMsg ruleMsg = msg.getMsg(); + MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR; + Integer requestId = 0; + if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) { + requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId(); + } + trustedCtx.reply( + new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(), + BasicStatusCodeResponse.onError(responceMsgType, requestId, ex))); + } } else { //TODO: reply with plugin suspended message } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 1df2f1cf31..fc04a193e1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -181,6 +181,10 @@ cassandra: default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" + buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" + concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" + permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}" queue: msg.ttl: 604800 # 7 days @@ -234,7 +238,7 @@ caffeine: specs: relations: timeToLiveInMinutes: 1440 - maxSize: 0 + maxSize: 100000 deviceCredentials: timeToLiveInMinutes: 1440 maxSize: 100000 diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java index ace51c03a7..87708a759f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcRequestMsg.java @@ -16,16 +16,16 @@ package org.thingsboard.server.common.msg.core; import lombok.Data; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; import org.thingsboard.server.common.msg.session.MsgType; /** * @author Andrew Shvayka */ @Data -public class ToServerRpcRequestMsg implements FromDeviceMsg { +public class ToServerRpcRequestMsg implements FromDeviceRequestMsg { - private final int requestId; + private final Integer requestId; private final String method; private final String params; diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java index 4f923fe808..64ec718fdd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java @@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao>() { @Nullable @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java index 932d6b9147..8ae9dc8ad4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java @@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) .and(eq(ATTRIBUTE_KEY_COLUMN, key)); log.debug("Remove request: {}", delete.toString()); - return getFuture(getSession().executeAsync(delete), rs -> null); + return getFuture(executeAsyncWrite(delete), rs -> null); } private PreparedStatement getSaveStmt() { if (saveStmt == null) { - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + + saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF + "(" + ENTITY_TYPE_COLUMN + "," + ENTITY_ID_COLUMN + "," + ATTRIBUTE_TYPE_COLUMN + diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java index 27f7adc669..fd02b5f880 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java @@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao>() { @Nullable @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java index c2f709f05b..ba186ccebe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.model.type.*; +import org.thingsboard.server.dao.util.BufferedRateLimiter; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -33,16 +34,15 @@ public abstract class CassandraAbstractDao { private ConcurrentMap preparedStatementMap = new ConcurrentHashMap<>(); - protected PreparedStatement prepare(String query) { - return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i)); - } + @Autowired + private BufferedRateLimiter rateLimiter; private Session session; private ConsistencyLevel defaultReadLevel; private ConsistencyLevel defaultWriteLevel; - protected Session getSession() { + private Session getSession() { if (session == null) { session = cluster.getSession(); defaultReadLevel = cluster.getDefaultReadConsistencyLevel(); @@ -59,6 +59,10 @@ public abstract class CassandraAbstractDao { return session; } + protected PreparedStatement prepare(String query) { + return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i)); + } + private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec codec) { try { registry.codecFor(codec.getCqlType(), codec.getJavaType()); @@ -85,10 +89,7 @@ public abstract class CassandraAbstractDao { private ResultSet execute(Statement statement, ConsistencyLevel level) { log.debug("Execute cassandra statement {}", statement); - if (statement.getConsistencyLevel() == null) { - statement.setConsistencyLevel(level); - } - return getSession().execute(statement); + return executeAsync(statement, level).getUninterruptibly(); } private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) { @@ -96,6 +97,6 @@ public abstract class CassandraAbstractDao { if (statement.getConsistencyLevel() == null) { statement.setConsistencyLevel(level); } - return getSession().executeAsync(statement); + return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement); } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java index 7e87fa8fec..47d43ba57f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java @@ -63,7 +63,7 @@ public abstract class CassandraAbstractModelDao, D> exte List list = Collections.emptyList(); if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSet resultSet = getSession().execute(statement); + ResultSet resultSet = executeRead(statement); Result result = getMapper().map(resultSet); if (result != null) { list = result.all(); @@ -75,7 +75,7 @@ public abstract class CassandraAbstractModelDao, D> exte protected ListenableFuture> findListByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ResultSetFuture resultSetFuture = executeAsyncRead(statement); return Futures.transform(resultSetFuture, new Function>() { @Nullable @Override @@ -97,7 +97,7 @@ public abstract class CassandraAbstractModelDao, D> exte E object = null; if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSet resultSet = getSession().execute(statement); + ResultSet resultSet = executeRead(statement); Result result = getMapper().map(resultSet); if (result != null) { object = result.one(); @@ -109,7 +109,7 @@ public abstract class CassandraAbstractModelDao, D> exte protected ListenableFuture findOneByStatementAsync(Statement statement) { if (statement != null) { statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); - ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ResultSetFuture resultSetFuture = executeAsyncRead(statement); return Futures.transform(resultSetFuture, new Function() { @Nullable @Override @@ -184,7 +184,7 @@ public abstract class CassandraAbstractModelDao, D> exte public boolean removeById(UUID key) { Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); log.debug("Remove request: {}", delete.toString()); - return getSession().execute(delete).wasApplied(); + return executeWrite(delete).wasApplied(); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java new file mode 100644 index 0000000000..2674c6ddea --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import org.thingsboard.server.dao.util.AsyncRateLimiter; + +import javax.annotation.Nullable; +import java.util.concurrent.*; + +public class RateLimitedResultSetFuture implements ResultSetFuture { + + private final ListenableFuture originalFuture; + private final ListenableFuture rateLimitFuture; + + public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { + this.rateLimitFuture = rateLimiter.acquireAsync(); + this.originalFuture = Futures.transform(rateLimitFuture, + (Function) i -> executeAsyncWithRelease(rateLimiter, session, statement)); + } + + @Override + public ResultSet getUninterruptibly() { + return safeGet().getUninterruptibly(); + } + + @Override + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException { + long rateLimitStart = System.nanoTime(); + ResultSetFuture resultSetFuture = null; + try { + resultSetFuture = originalFuture.get(timeout, unit); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + long rateLimitDurationNano = System.nanoTime() - rateLimitStart; + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano; + if (innerTimeoutNano > 0) { + return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS); + } + throw new TimeoutException("Timeout waiting for task."); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (originalFuture.isDone()) { + return safeGet().cancel(mayInterruptIfRunning); + } else { + return originalFuture.cancel(mayInterruptIfRunning); + } + } + + @Override + public boolean isCancelled() { + if (originalFuture.isDone()) { + return safeGet().isCancelled(); + } + + return originalFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return originalFuture.isDone() && safeGet().isDone(); + } + + @Override + public ResultSet get() throws InterruptedException, ExecutionException { + return safeGet().get(); + } + + @Override + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long rateLimitStart = System.nanoTime(); + ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit); + long rateLimitDurationNano = System.nanoTime() - rateLimitStart; + long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano; + if (innerTimeoutNano > 0) { + return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS); + } + throw new TimeoutException("Timeout waiting for task."); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + originalFuture.addListener(() -> { + try { + ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture); + resultSetFuture.addListener(listener, executor); + } catch (CancellationException e) { + cancel(false); + return; + } catch (ExecutionException e) { + Futures.immediateFailedFuture(e).addListener(listener, executor); + } + }, executor); + } + + private ResultSetFuture safeGet() { + try { + return originalFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) { + try { + ResultSetFuture resultSetFuture = session.executeAsync(statement); + Futures.addCallback(resultSetFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable ResultSet result) { + rateLimiter.release(); + } + + @Override + public void onFailure(Throwable t) { + rateLimiter.release(); + } + }); + return resultSetFuture; + } catch (RuntimeException re) { + rateLimiter.release(); + throw re; + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java index 9e252412b2..55838d6462 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java @@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getSaveStmt() { if (saveStmt == null) { - saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + + saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + "(" + ModelConstants.RELATION_FROM_ID_PROPERTY + "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY + "," + ModelConstants.RELATION_TO_ID_PROPERTY + @@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getDeleteStmt() { if (deleteStmt == null) { - deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + + deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" + AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" + @@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getDeleteAllByEntityStmt() { if (deleteAllByEntityStmt == null) { - deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + + deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?"); } @@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByFromStmt() { if (findAllByFromStmt == null) { - findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByFromStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByFromAndTypeStmt() { if (findAllByFromAndTypeStmt == null) { - findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByToStmt() { if (findAllByToStmt == null) { - findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByToStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getFindAllByToAndTypeStmt() { if (findAllByToAndTypeStmt == null) { - findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " + + findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " + WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM + @@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati private PreparedStatement getCheckRelationStmt() { if (checkRelationStmt == null) { - checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " + + checkRelationStmt = prepare(SELECT_COLUMNS + " " + FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " + WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM + AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM + diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 01f60f8dff..370d770188 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -82,8 +82,9 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @@ -95,8 +96,9 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @@ -108,11 +110,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @Override public boolean deleteRelation(EntityRelation relation) { @@ -122,11 +124,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup}") }) @Override public ListenableFuture deleteRelationAsync(EntityRelation relation) { @@ -136,11 +138,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}") }) @Override public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { @@ -150,11 +152,11 @@ public class BaseRelationService implements RelationService { } @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}") + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup}"), + @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}") }) @Override public ListenableFuture deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index d620e11f1d..cda4b1669b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement partitionInsertStmt; private PreparedStatement partitionInsertTtlStmt; - private PreparedStatement[] latestInsertStmts; + private PreparedStatement latestInsertStmt; private PreparedStatement[] saveStmts; private PreparedStatement[] saveTtlStmts; private PreparedStatement[] fetchStmts; @@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { - DataType type = tsKvEntry.getDataType(); - BoundStatement stmt = getLatestStmt(type).bind() + BoundStatement stmt = getLatestStmt().bind() .setString(0, entityId.getEntityType().name()) .setUUID(1, entityId.getId()) .setString(2, tsKvEntry.getKey()) - .setLong(3, tsKvEntry.getTs()); - addValue(tsKvEntry, stmt, 4); + .setLong(3, tsKvEntry.getTs()) + .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class) + .set(5, tsKvEntry.getStrValue().orElse(null), String.class) + .set(6, tsKvEntry.getLongValue().orElse(null), Long.class) + .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class); return getFuture(executeAsyncWrite(stmt), rs -> null); } @@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (saveStmts == null) { saveStmts = new PreparedStatement[DataType.values().length]; for (DataType type : DataType.values()) { - saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN + @@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (saveTtlStmts == null) { saveTtlStmts = new PreparedStatement[DataType.values().length]; for (DataType type : DataType.values()) { - saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF + + saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.KEY_COLUMN + @@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) { fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()]; } else { - fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX + + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM @@ -435,26 +437,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return fetchStmts[aggType.ordinal()]; } - private PreparedStatement getLatestStmt(DataType dataType) { - if (latestInsertStmts == null) { - latestInsertStmts = new PreparedStatement[DataType.values().length]; - for (DataType type : DataType.values()) { - latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + - "(" + ModelConstants.ENTITY_TYPE_COLUMN + - "," + ModelConstants.ENTITY_ID_COLUMN + - "," + ModelConstants.KEY_COLUMN + - "," + ModelConstants.TS_COLUMN + - "," + getColumnName(type) + ")" + - " VALUES(?, ?, ?, ?, ?)"); - } + private PreparedStatement getLatestStmt() { + if (latestInsertStmt == null) { + latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + ModelConstants.BOOLEAN_VALUE_COLUMN + + "," + ModelConstants.STRING_VALUE_COLUMN + + "," + ModelConstants.LONG_VALUE_COLUMN + + "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?)"); } - return latestInsertStmts[dataType.ordinal()]; + return latestInsertStmt; } private PreparedStatement getPartitionInsertStmt() { if (partitionInsertStmt == null) { - partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + + partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.PARTITION_COLUMN + @@ -466,7 +468,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getPartitionInsertTtlStmt() { if (partitionInsertTtlStmt == null) { - partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + + partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF + "(" + ModelConstants.ENTITY_TYPE_COLUMN + "," + ModelConstants.ENTITY_ID_COLUMN + "," + ModelConstants.PARTITION_COLUMN + @@ -479,7 +481,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getFindLatestStmt() { if (findLatestStmt == null) { - findLatestStmt = getSession().prepare(SELECT_PREFIX + + findLatestStmt = prepare(SELECT_PREFIX + ModelConstants.KEY_COLUMN + "," + ModelConstants.TS_COLUMN + "," + ModelConstants.STRING_VALUE_COLUMN + "," + @@ -496,7 +498,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private PreparedStatement getFindAllLatestStmt() { if (findAllLatestStmt == null) { - findAllLatestStmt = getSession().prepare(SELECT_PREFIX + + findAllLatestStmt = prepare(SELECT_PREFIX + ModelConstants.KEY_COLUMN + "," + ModelConstants.TS_COLUMN + "," + ModelConstants.STRING_VALUE_COLUMN + "," + diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java new file mode 100644 index 0000000000..6fb21d6adb --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.ListenableFuture; + +public interface AsyncRateLimiter { + + ListenableFuture acquireAsync(); + + void release(); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java new file mode 100644 index 0000000000..2acd623a37 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java @@ -0,0 +1,164 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +@Slf4j +@NoSqlDao +public class BufferedRateLimiter implements AsyncRateLimiter { + + private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + + private final int permitsLimit; + private final int maxPermitWaitTime; + private final AtomicInteger permits; + private final BlockingQueue queue; + + private final AtomicInteger maxQueueSize = new AtomicInteger(); + private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); + + public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, + @Value("${cassandra.query.concurrent_limit}") int permitsLimit, + @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) { + this.permitsLimit = permitsLimit; + this.maxPermitWaitTime = maxPermitWaitTime; + this.permits = new AtomicInteger(); + this.queue = new LinkedBlockingQueue<>(queueLimit); + } + + @Override + public ListenableFuture acquireAsync() { + if (queue.isEmpty()) { + if (permits.incrementAndGet() <= permitsLimit) { + if (permits.get() > maxGrantedPermissions.get()) { + maxGrantedPermissions.set(permits.get()); + } + return Futures.immediateFuture(null); + } + permits.decrementAndGet(); + } + + return putInQueue(); + } + + @Override + public void release() { + permits.decrementAndGet(); + reprocessQueue(); + } + + private void reprocessQueue() { + while (permits.get() < permitsLimit) { + if (permits.incrementAndGet() <= permitsLimit) { + if (permits.get() > maxGrantedPermissions.get()) { + maxGrantedPermissions.set(permits.get()); + } + LockedFuture lockedFuture = queue.poll(); + if (lockedFuture != null) { + lockedFuture.latch.countDown(); + } else { + permits.decrementAndGet(); + break; + } + } else { + permits.decrementAndGet(); + } + } + } + + private LockedFuture createLockedFuture() { + CountDownLatch latch = new CountDownLatch(1); + ListenableFuture future = pool.submit(() -> { + latch.await(); + return null; + }); + return new LockedFuture(latch, future, System.currentTimeMillis()); + } + + private ListenableFuture putInQueue() { + + int size = queue.size(); + if (size > maxQueueSize.get()) { + maxQueueSize.set(size); + } + + if (queue.remainingCapacity() > 0) { + try { + LockedFuture lockedFuture = createLockedFuture(); + if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { + lockedFuture.cancelFuture(); + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + } + if(permits.get() < permitsLimit) { + reprocessQueue(); + } + return lockedFuture.future; + } catch (InterruptedException e) { + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); + } + } + return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + } + + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") + public void printStats() { + int expiredCount = 0; + for (LockedFuture lockedFuture : queue) { + if (lockedFuture.isExpired()) { + lockedFuture.cancelFuture(); + expiredCount++; + } + } + log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), + maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); + } + + private class LockedFuture { + final CountDownLatch latch; + final ListenableFuture future; + final long createTime; + + public LockedFuture(CountDownLatch latch, ListenableFuture future, long createTime) { + this.latch = latch; + this.future = future; + this.createTime = createTime; + } + + void cancelFuture() { + future.cancel(false); + latch.countDown(); + } + + boolean isExpired() { + return (System.currentTimeMillis() - createTime) > maxPermitWaitTime; + } + + } + + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java new file mode 100644 index 0000000000..fa62c2b9b0 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java @@ -0,0 +1,156 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.thingsboard.server.dao.util.AsyncRateLimiter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RateLimitedResultSetFutureTest { + + private RateLimitedResultSetFuture resultSetFuture; + + @Mock + private AsyncRateLimiter rateLimiter; + @Mock + private Session session; + @Mock + private Statement statement; + @Mock + private ResultSetFuture realFuture; + @Mock + private ResultSet rows; + @Mock + private Row row; + + @Test + public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException())); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + Thread.sleep(1000L); + verify(rateLimiter).acquireAsync(); + try { + assertTrue(resultSetFuture.isDone()); + fail(); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + Throwable actualCause = e.getCause(); + assertTrue(actualCause instanceof ExecutionException); + } + verifyNoMoreInteractions(session, rateLimiter, statement); + + } + + @Test + public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.getUninterruptibly()).thenReturn(rows); + + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ResultSet actual = resultSetFuture.getUninterruptibly(); + assertSame(rows, actual); + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.get()).thenReturn(rows); + when(rows.one()).thenReturn(row); + + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + Row actualRow = transform.get(); + + assertSame(row, actualRow); + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg")); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + + @Test + public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException { + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); + when(session.executeAsync(statement)).thenReturn(realFuture); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + task.run(); + return null; + }).when(realFuture).addListener(Mockito.any(), Mockito.any()); + + when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout"))); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + +} \ No newline at end of file diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java new file mode 100644 index 0000000000..5bfc3b6e95 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import com.google.common.util.concurrent.*; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + + +public class BufferedRateLimiterTest { + + @Test + public void finishedFutureReturnedIfPermitsAreGranted() { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100); + ListenableFuture actual = limiter.acquireAsync(); + assertTrue(actual.isDone()); + } + + @Test + public void notFinishedFutureReturnedIfPermitsAreNotGranted() { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + assertTrue(actual1.isDone()); + assertFalse(actual2.isDone()); + } + + @Test + public void failedFutureReturnedIfQueueIsfull() { + BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + ListenableFuture actual3 = limiter.acquireAsync(); + + assertTrue(actual1.isDone()); + assertFalse(actual2.isDone()); + assertTrue(actual3.isDone()); + try { + actual3.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + Throwable actualCause = e.getCause(); + assertTrue(actualCause instanceof IllegalStateException); + assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage()); + } + } + + @Test + public void releasedPermitTriggerTasksFromQueue() throws InterruptedException { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); + ListenableFuture actual1 = limiter.acquireAsync(); + ListenableFuture actual2 = limiter.acquireAsync(); + ListenableFuture actual3 = limiter.acquireAsync(); + ListenableFuture actual4 = limiter.acquireAsync(); + assertTrue(actual1.isDone()); + assertTrue(actual2.isDone()); + assertFalse(actual3.isDone()); + assertFalse(actual4.isDone()); + limiter.release(); + TimeUnit.MILLISECONDS.sleep(100L); + assertTrue(actual3.isDone()); + assertFalse(actual4.isDone()); + limiter.release(); + TimeUnit.MILLISECONDS.sleep(100L); + assertTrue(actual4.isDone()); + } + + @Test + public void permitsReleasedInConcurrentMode() throws InterruptedException { + BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); + AtomicInteger actualReleased = new AtomicInteger(); + AtomicInteger actualRejected = new AtomicInteger(); + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); + for (int i = 0; i < 100; i++) { + ListenableFuture> submit = pool.submit(limiter::acquireAsync); + Futures.addCallback(submit, new FutureCallback>() { + @Override + public void onSuccess(@Nullable ListenableFuture result) { + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + limiter.release(); + actualReleased.incrementAndGet(); + } + + @Override + public void onFailure(Throwable t) { + actualRejected.incrementAndGet(); + } + }); + } + + @Override + public void onFailure(Throwable t) { + } + }); + } + + TimeUnit.SECONDS.sleep(2); + assertTrue("Unexpected released count " + actualReleased.get(), + actualReleased.get() > 10 && actualReleased.get() < 20); + assertTrue("Unexpected rejected count " + actualRejected.get(), + actualRejected.get() > 80 && actualRejected.get() < 90); + + } + + +} \ No newline at end of file diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 82fcbe1949..737687f053 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000 cassandra.query.ts_key_value_partitioning=HOURS cassandra.query.max_limit_per_request=1000 +cassandra.query.buffer_size=100000 +cassandra.query.concurrent_limit=1000 +cassandra.query.permit_max_wait_time=20000 +cassandra.query.rate_limit_print_interval_ms=30000 + diff --git a/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js b/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js index bf4886c994..e1d2519ae3 100644 --- a/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js +++ b/ui/src/app/extension/extensions-forms/extension-form-opc.directive.js @@ -128,8 +128,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra let addedFile = event.target.result; if (addedFile && addedFile.length > 0) { - model[options.fileName] = $file.name; - model[options.file] = addedFile.replace(/^data.*base64,/, ""); + model[options.location] = $file.name; + model[options.fileContent] = addedFile.replace(/^data.*base64,/, ""); } } @@ -142,8 +142,8 @@ export default function ExtensionFormOpcDirective($compile, $templateCache, $tra scope.clearFile = function(model, options) { scope.theForm.$setDirty(); - model[options.fileName] = null; - model[options.file] = null; + model[options.location] = null; + model[options.fileContent] = null; }; diff --git a/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html b/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html index 501eeebd07..5a7c00ba3b 100644 --- a/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html +++ b/ui/src/app/extension/extensions-forms/extension-form-opc.tpl.html @@ -212,8 +212,8 @@