|
|
|
@ -15,16 +15,20 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.server.service.subscription; |
|
|
|
|
|
|
|
import jakarta.annotation.PostConstruct; |
|
|
|
import jakarta.annotation.PreDestroy; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.context.annotation.Lazy; |
|
|
|
import org.springframework.context.event.EventListener; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.thingsboard.common.util.DeduplicationUtil; |
|
|
|
import org.thingsboard.common.util.DonAsynchron; |
|
|
|
import org.thingsboard.common.util.ThingsBoardExecutors; |
|
|
|
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|
|
|
import org.thingsboard.server.cache.limits.RateLimitService; |
|
|
|
import org.thingsboard.server.cluster.TbClusterService; |
|
|
|
import org.thingsboard.server.common.data.AttributeScope; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.alarm.AlarmInfo; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
@ -34,10 +38,12 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|
|
|
import org.thingsboard.server.common.data.limit.LimitedApi; |
|
|
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|
|
|
import org.thingsboard.server.common.msg.queue.ServiceType; |
|
|
|
import org.thingsboard.server.common.msg.queue.TbCallback; |
|
|
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|
|
|
import org.thingsboard.server.common.msg.tools.TbRateLimitsException; |
|
|
|
import org.thingsboard.server.dao.attributes.AttributesService; |
|
|
|
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|
|
|
import org.thingsboard.server.gen.transport.TransportProtos; |
|
|
|
@ -46,13 +52,12 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|
|
|
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; |
|
|
|
import org.thingsboard.server.queue.util.TbCoreComponent; |
|
|
|
import org.thingsboard.server.service.ws.WebSocketService; |
|
|
|
import org.thingsboard.server.service.ws.WebSocketSessionRef; |
|
|
|
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; |
|
|
|
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; |
|
|
|
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; |
|
|
|
|
|
|
|
import jakarta.annotation.PostConstruct; |
|
|
|
import jakarta.annotation.PreDestroy; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
@ -88,13 +93,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer |
|
|
|
private final TbClusterService clusterService; |
|
|
|
private final SubscriptionManagerService subscriptionManagerService; |
|
|
|
private final WebSocketService webSocketService; |
|
|
|
private final RateLimitService rateLimitService; |
|
|
|
|
|
|
|
private ExecutorService tsCallBackExecutor; |
|
|
|
private ScheduledExecutorService staleSessionCleanupExecutor; |
|
|
|
|
|
|
|
@Value("${server.ws.rate_limits.subscriptions_per_tenant:2000:60}") |
|
|
|
private String subscriptionsPerTenantRateLimit; |
|
|
|
@Value("${server.ws.rate_limits.subscriptions_per_user:500:60}") |
|
|
|
private String subscriptionsPerUserRateLimit; |
|
|
|
|
|
|
|
public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider, |
|
|
|
PartitionService partitionService, TbClusterService clusterService, |
|
|
|
@Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService) { |
|
|
|
@Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService, |
|
|
|
RateLimitService rateLimitService) { |
|
|
|
this.attrService = attrService; |
|
|
|
this.tsService = tsService; |
|
|
|
this.serviceInfoProvider = serviceInfoProvider; |
|
|
|
@ -102,6 +114,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer |
|
|
|
this.clusterService = clusterService; |
|
|
|
this.subscriptionManagerService = subscriptionManagerService; |
|
|
|
this.webSocketService = webSocketService; |
|
|
|
this.rateLimitService = rateLimitService; |
|
|
|
} |
|
|
|
|
|
|
|
private String serviceId; |
|
|
|
@ -164,9 +177,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void addSubscription(TbSubscription<?> subscription) { |
|
|
|
public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) { |
|
|
|
TenantId tenantId = subscription.getTenantId(); |
|
|
|
EntityId entityId = subscription.getEntityId(); |
|
|
|
if (!rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, (Object) tenantId, subscriptionsPerTenantRateLimit)) { |
|
|
|
handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per tenant"); |
|
|
|
return; |
|
|
|
} |
|
|
|
if (sessionRef.getSecurityCtx() != null && !rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, sessionRef.getSecurityCtx().getId(), subscriptionsPerUserRateLimit)) { |
|
|
|
handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per user"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription); |
|
|
|
SubscriptionModificationResult result; |
|
|
|
subsLock.lock(); |
|
|
|
@ -563,4 +585,13 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer |
|
|
|
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale); |
|
|
|
} |
|
|
|
|
|
|
|
private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) { |
|
|
|
String deduplicationKey = sessionRef.getSessionId() + message; |
|
|
|
if (!DeduplicationUtil.alreadyProcessed(deduplicationKey, TimeUnit.SECONDS.toMillis(15))) { |
|
|
|
log.info("{} {}", sessionRef, message); |
|
|
|
webSocketService.sendError(sessionRef, subscription.getSubscriptionId(), SubscriptionErrorCode.BAD_REQUEST, message); |
|
|
|
} |
|
|
|
throw new TbRateLimitsException(message); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|