diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 42191e57a9..52f6395a13 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -145,7 +145,8 @@ cassandra: rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" tenant_rate_limits: enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" - configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_VALUE:1000:1,30000:60}" + configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}" + print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}" # SQL configuration parameters sql: diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java index 05370efce3..be40bf49e0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java @@ -17,21 +17,24 @@ package org.thingsboard.server.dao.nosql; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.tools.TbRateLimits; +import org.thingsboard.server.dao.entity.EntityService; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; import org.thingsboard.server.dao.util.AsyncTaskContext; import org.thingsboard.server.dao.util.NoSqlAnyDao; import javax.annotation.PreDestroy; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; /** * Created by ashvayka on 24.10.18. @@ -41,6 +44,12 @@ import java.util.concurrent.ConcurrentMap; @NoSqlAnyDao public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor { + @Autowired + private EntityService entityService; + private Map tenantNamesCache = new HashMap<>(); + + private boolean printTenantNames; + public CassandraBufferedRateExecutor( @Value("${cassandra.query.buffer_size}") int queueLimit, @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, @@ -49,17 +58,37 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor< @Value("${cassandra.query.callback_threads:2}") int callbackThreads, @Value("${cassandra.query.poll_ms:50}") long pollMs, @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, - @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration) { + @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, + @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames) { super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration); + this.printTenantNames = printTenantNames; } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") public void printStats() { - log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] totalRateLimited [{}] currBuffer [{}] ", + log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] " + + "totalRateLimited [{}] totalRateLimitedTenants [{}] currBuffer [{}] ", getQueueSize(), totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0), totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0), - totalRateLimited.getAndSet(0), concurrencyLevel.get()); + totalRateLimited.getAndSet(0), rateLimitedTenants.size(), concurrencyLevel.get()); + + rateLimitedTenants.forEach(((tenantId, counter) -> { + if (printTenantNames) { + String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> { + try { + return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get(); + } catch (Exception e) { + log.error("[{}] Failed to get tenant name", tenantId, e); + return "N/A"; + } + }); + log.info("[{}][{}] Rate limited requests: {}", tenantId, name, counter); + } else { + log.info("[{}] Rate limited requests: {}", tenantId, counter); + } + })); + rateLimitedTenants.clear(); } @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index c4c9b60504..65848fca14 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.tools.TbRateLimits; import javax.annotation.Nullable; +import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -36,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Created by ashvayka on 24.10.18. @@ -53,6 +55,7 @@ public abstract class AbstractBufferedRateExecutor perTenantLimits = new ConcurrentHashMap<>(); + protected final ConcurrentMap rateLimitedTenants = new ConcurrentHashMap<>(); protected final AtomicInteger concurrencyLevel = new AtomicInteger(); protected final AtomicInteger totalAdded = new AtomicInteger(); @@ -90,6 +93,7 @@ public abstract class AbstractBufferedRateExecutor new TbRateLimits(perTenantLimitsConfiguration)); if (!rateLimits.tryConsume()) { + rateLimitedTenants.computeIfAbsent(task.getTenantId(), tId -> new AtomicInteger(0)).incrementAndGet(); totalRateLimited.incrementAndGet(); settableFuture.setException(new TenantRateLimitException()); perTenantLimitReached = true;