|
|
|
@ -79,6 +79,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; |
|
|
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|
|
|
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
|
|
|
import org.thingsboard.server.queue.provider.TbTransportQueueFactory; |
|
|
|
import org.thingsboard.server.queue.util.TbTransportComponent; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
@ -103,7 +104,7 @@ import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
*/ |
|
|
|
@Slf4j |
|
|
|
@Service |
|
|
|
@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") |
|
|
|
@TbTransportComponent |
|
|
|
public class DefaultTransportService implements TransportService { |
|
|
|
|
|
|
|
@Value("${transport.sessions.inactivity_timeout}") |
|
|
|
@ -363,7 +364,11 @@ public class DefaultTransportService implements TransportService { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { |
|
|
|
if (checkLimits(sessionInfo, msg, callback)) { |
|
|
|
int dataPoints = 0; |
|
|
|
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { |
|
|
|
dataPoints += tsKv.getKvCount(); |
|
|
|
} |
|
|
|
if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) { |
|
|
|
reportActivityInternal(sessionInfo); |
|
|
|
TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); |
|
|
|
@ -384,7 +389,7 @@ public class DefaultTransportService implements TransportService { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) { |
|
|
|
if (checkLimits(sessionInfo, msg, callback)) { |
|
|
|
if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) { |
|
|
|
reportActivityInternal(sessionInfo); |
|
|
|
TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); |
|
|
|
@ -574,37 +579,34 @@ public class DefaultTransportService implements TransportService { |
|
|
|
sessions.remove(toSessionId(sessionInfo)); |
|
|
|
} |
|
|
|
|
|
|
|
private TransportRateLimitType[] DEFAULT = new TransportRateLimitType[]{TransportRateLimitType.TENANT_MAX_MSGS, TransportRateLimitType.DEVICE_MAX_MSGS}; |
|
|
|
private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values(); |
|
|
|
|
|
|
|
@Override |
|
|
|
public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) { |
|
|
|
return checkLimits(sessionInfo, msg, callback, 0, DEFAULT); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback, int dataPoints, TransportRateLimitType... limits) { |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); |
|
|
|
} |
|
|
|
TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); |
|
|
|
|
|
|
|
TransportRateLimit tenantRateLimit = rateLimitService.getRateLimit(tenantId, TransportRateLimitType.TENANT_MAX_MSGS); |
|
|
|
|
|
|
|
if (!tenantRateLimit.tryConsume()) { |
|
|
|
if (callback != null) { |
|
|
|
callback.onError(new TbRateLimitsException(EntityType.TENANT)); |
|
|
|
} |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}][{}] Tenant level rate limit detected: {}", toSessionId(sessionInfo), tenantId, msg); |
|
|
|
} |
|
|
|
return false; |
|
|
|
} |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); |
|
|
|
TransportRateLimit deviceRateLimit = rateLimitService.getRateLimit(tenantId, deviceId, TransportRateLimitType.DEVICE_MAX_MSGS); |
|
|
|
if (!deviceRateLimit.tryConsume()) { |
|
|
|
|
|
|
|
TransportRateLimitType limit = rateLimitService.checkLimits(tenantId, deviceId, 0, limits); |
|
|
|
if (limit == null) { |
|
|
|
return true; |
|
|
|
} else { |
|
|
|
if (callback != null) { |
|
|
|
callback.onError(new TbRateLimitsException(EntityType.DEVICE)); |
|
|
|
callback.onError(new TbRateLimitsException(limit.isTenantLevel() ? EntityType.TENANT : EntityType.DEVICE)); |
|
|
|
} |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}][{}] Device level rate limit detected: {}", toSessionId(sessionInfo), deviceId, msg); |
|
|
|
log.trace("[{}][{}] {} rateLimit detected: {}", toSessionId(sessionInfo), tenantId, limit, msg); |
|
|
|
} |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) { |
|
|
|
|