|
|
|
@ -17,21 +17,24 @@ package org.thingsboard.server.dao.nosql; |
|
|
|
|
|
|
|
import com.datastax.driver.core.ResultSet; |
|
|
|
import com.datastax.driver.core.ResultSetFuture; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.SettableFuture; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.scheduling.annotation.Scheduled; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.msg.tools.TbRateLimits; |
|
|
|
import org.thingsboard.server.dao.entity.EntityService; |
|
|
|
import org.thingsboard.server.dao.tenant.TenantService; |
|
|
|
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor; |
|
|
|
import org.thingsboard.server.dao.util.AsyncTaskContext; |
|
|
|
import org.thingsboard.server.dao.util.NoSqlAnyDao; |
|
|
|
|
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
|
|
|
|
/** |
|
|
|
* Created by ashvayka on 24.10.18. |
|
|
|
@ -41,6 +44,12 @@ import java.util.concurrent.ConcurrentMap; |
|
|
|
@NoSqlAnyDao |
|
|
|
public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, ResultSetFuture, ResultSet> { |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private EntityService entityService; |
|
|
|
private Map<TenantId, String> tenantNamesCache = new HashMap<>(); |
|
|
|
|
|
|
|
private boolean printTenantNames; |
|
|
|
|
|
|
|
public CassandraBufferedRateExecutor( |
|
|
|
@Value("${cassandra.query.buffer_size}") int queueLimit, |
|
|
|
@Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, |
|
|
|
@ -49,17 +58,37 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor< |
|
|
|
@Value("${cassandra.query.callback_threads:2}") int callbackThreads, |
|
|
|
@Value("${cassandra.query.poll_ms:50}") long pollMs, |
|
|
|
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, |
|
|
|
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration) { |
|
|
|
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, |
|
|
|
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames) { |
|
|
|
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration); |
|
|
|
this.printTenantNames = printTenantNames; |
|
|
|
} |
|
|
|
|
|
|
|
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") |
|
|
|
public void printStats() { |
|
|
|
log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] totalRateLimited [{}] currBuffer [{}] ", |
|
|
|
log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] " + |
|
|
|
"totalRateLimited [{}] totalRateLimitedTenants [{}] currBuffer [{}] ", |
|
|
|
getQueueSize(), |
|
|
|
totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0), |
|
|
|
totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0), |
|
|
|
totalRateLimited.getAndSet(0), concurrencyLevel.get()); |
|
|
|
totalRateLimited.getAndSet(0), rateLimitedTenants.size(), concurrencyLevel.get()); |
|
|
|
|
|
|
|
rateLimitedTenants.forEach(((tenantId, counter) -> { |
|
|
|
if (printTenantNames) { |
|
|
|
String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> { |
|
|
|
try { |
|
|
|
return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get(); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("[{}] Failed to get tenant name", tenantId, e); |
|
|
|
return "N/A"; |
|
|
|
} |
|
|
|
}); |
|
|
|
log.info("[{}][{}] Rate limited requests: {}", tenantId, name, counter); |
|
|
|
} else { |
|
|
|
log.info("[{}] Rate limited requests: {}", tenantId, counter); |
|
|
|
} |
|
|
|
})); |
|
|
|
rateLimitedTenants.clear(); |
|
|
|
} |
|
|
|
|
|
|
|
@PreDestroy |
|
|
|
|