diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2c24caa1f2..22f52aff03 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1138,6 +1138,8 @@ transport: timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}" # Enable/disable http/mqtt/coap/lwm2m transport protocols (has higher priority than certain protocol's 'enabled' property) api_enabled: "${TB_TRANSPORT_API_ENABLED:true}" + # Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently. + callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}" log: # Enable/Disable log of transport messages to telemetry. For example, logging of LwM2M registration update enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java index 30bedf4eb6..e7ef634c72 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java @@ -107,11 +107,12 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi @Override public void update(TenantProfileUpdateResult update) { - log.info("Received tenant profile update: {}", update.getProfile()); - EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(update.getProfile(), TENANT_LIMITS); - EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(update.getProfile(), DEVICE_LIMITS); - EntityTransportRateLimits gatewayRateLimitPrototype = createRateLimits(update.getProfile(), GATEWAY_LIMITS); - EntityTransportRateLimits gatewayDeviceRateLimitPrototype = createRateLimits(update.getProfile(), GATEWAY_DEVICE_LIMITS); + TenantProfile profile = update.getProfile(); + log.info("Received tenant profile update: {}", profile); + EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(profile, TENANT_LIMITS); + EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(profile, DEVICE_LIMITS); + EntityTransportRateLimits gatewayRateLimitPrototype = createRateLimits(profile, GATEWAY_LIMITS); + EntityTransportRateLimits gatewayDeviceRateLimitPrototype = createRateLimits(profile, GATEWAY_DEVICE_LIMITS); for (TenantId tenantId : update.getAffectedTenants()) { update(tenantId, tenantRateLimitPrototype, deviceRateLimitPrototype, gatewayRateLimitPrototype, gatewayDeviceRateLimitPrototype); } @@ -301,15 +302,17 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi } private EntityTransportRateLimits getRateLimits(ConcurrentMap limitsMap, TenantId tenantId, - T entityId, TransportLimitsType limitsType, Runnable onCreate) { + T entityId, TransportLimitsType limitsType, Runnable onMiss) { EntityTransportRateLimits limits = limitsMap.get(entityId); if (limits == null) { // Resolve the tenant profile WITHOUT holding the ConcurrentHashMap bin lock: the fetch may // block on a cross-service round-trip, so it must run before computeIfAbsent's mapping function. TenantProfile tenantProfile = tenantProfileCache.get(tenantId); limits = limitsMap.computeIfAbsent(entityId, k -> createRateLimits(tenantProfile, limitsType)); - if (onCreate != null) { - onCreate.run(); + // Runs on every observed miss, including callers that lost the computeIfAbsent race and got an + // existing value back - NOT only on actual creation, so the callback must be idempotent. + if (onMiss != null) { + onMiss.run(); } } return limits; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 240de91424..80980beeb7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -153,6 +153,8 @@ public class DefaultTransportService extends TransportActivityManager implements private int notificationsPollDuration; @Value("${transport.stats.enabled:false}") private boolean statsEnabled; + @Value("${transport.callback_thread_pool_size:20}") + private int callbackThreadPoolSize; @Autowired @Lazy @@ -198,7 +200,7 @@ public class DefaultTransportService extends TransportActivityManager implements this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer"); this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer"); this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer"); - this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); + this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, getClass()); this.scheduler.scheduleAtFixedRate(this::invalidateRateLimits, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate(); transportApiRequestTemplate.setMessagesStats(transportApiStats); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java index 8eb8e795c1..ac2fd4c28a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java @@ -44,9 +44,14 @@ import java.util.concurrent.locks.Lock; @Slf4j public class DefaultTransportTenantProfileCache implements TransportTenantProfileCache { + // Number of stripes for the per-tenant fetch locks. Only contended during concurrent cold-cache + // misses (cached tenants never take the lock), and concurrent fetches are already bounded by the + // transport callback pool, so this comfortably over-provisions the realistic concurrency. + private static final int TENANT_PROFILE_FETCH_LOCK_STRIPES = 1024; + // Bounded set of per-tenant locks: de-duplicates concurrent misses for the same tenant while // letting different tenants fetch concurrently (eager array - no weak-ref overhead at this size). - private final Striped tenantProfileFetchLocks = Striped.lock(1024); + private final Striped tenantProfileFetchLocks = Striped.lock(TENANT_PROFILE_FETCH_LOCK_STRIPES); private final ConcurrentMap profiles = new ConcurrentHashMap<>(); private final ConcurrentMap tenantIds = new ConcurrentHashMap<>(); private final ConcurrentMap> tenantProfileIds = new ConcurrentHashMap<>(); @@ -108,8 +113,7 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil TenantProfile profile = lookupCached(tenantId); if (profile == null) { // Per-tenant lock: de-duplicates concurrent misses for the SAME tenant while allowing - // different tenants to resolve their profiles concurrently. A single global lock here - // serializes the synchronous cross-service fetch below across the entire process. + // different tenants to resolve their profiles concurrently. Lock lock = tenantProfileFetchLocks.get(tenantId); lock.lock(); try { diff --git a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitServiceTest.java b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitServiceTest.java index 0ca00ed1e4..b7b6a81abf 100644 --- a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitServiceTest.java +++ b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitServiceTest.java @@ -18,13 +18,18 @@ package org.thingsboard.server.common.transport.limits; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.transport.TransportTenantProfileCache; +import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -82,13 +87,116 @@ class DefaultTransportRateLimitServiceTest { .isTrue(); } + @ParameterizedTest + @EnumSource(TransportLimitsType.class) + void eachLimitsTypeReadsItsOwnProfileFields(TransportLimitsType type) { + // Distinct sentinel per profile field so a transposed method reference (e.g. GATEWAY_DEVICE_LIMITS + // wired to the plain gateway getters) resolves to the wrong value and fails the assertion. + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + config.setTransportTenantMsgRateLimit("tenant-msg"); + config.setTransportTenantTelemetryMsgRateLimit("tenant-tele-msg"); + config.setTransportTenantTelemetryDataPointsRateLimit("tenant-tele-dp"); + config.setTransportDeviceMsgRateLimit("device-msg"); + config.setTransportDeviceTelemetryMsgRateLimit("device-tele-msg"); + config.setTransportDeviceTelemetryDataPointsRateLimit("device-tele-dp"); + config.setTransportGatewayMsgRateLimit("gateway-msg"); + config.setTransportGatewayTelemetryMsgRateLimit("gateway-tele-msg"); + config.setTransportGatewayTelemetryDataPointsRateLimit("gateway-tele-dp"); + config.setTransportGatewayDeviceMsgRateLimit("gateway-device-msg"); + config.setTransportGatewayDeviceTelemetryMsgRateLimit("gateway-device-tele-msg"); + config.setTransportGatewayDeviceTelemetryDataPointsRateLimit("gateway-device-tele-dp"); + + String prefix = switch (type) { + case TENANT_LIMITS -> "tenant"; + case DEVICE_LIMITS -> "device"; + case GATEWAY_LIMITS -> "gateway"; + case GATEWAY_DEVICE_LIMITS -> "gateway-device"; + }; + + assertThat(type.getRegularMsgRateLimit().apply(config)).isEqualTo(prefix + "-msg"); + assertThat(type.getTelemetryMsgRateLimit().apply(config)).isEqualTo(prefix + "-tele-msg"); + assertThat(type.getTelemetryDataPointsRateLimit().apply(config)).isEqualTo(prefix + "-tele-dp"); + } + + @ParameterizedTest + @EnumSource(EntityLevel.class) + void profileUpdateReachesEntityTrackedDuringFirstCheck(EntityLevel level) { + DeviceId entity = new DeviceId(UUID.randomUUID()); + when(tenantProfileCache.get(tenant)).thenReturn(profileWithRegularMsgLimit(level, "100:600")); + DefaultTransportRateLimitService service = new DefaultTransportRateLimitService(tenantProfileCache); + + // First check resolves the (permissive) limit and must register the entity into the per-tenant + // tracking set via the onMiss callback - otherwise a later update(tenantId) can't reach it. + assertThat(level.check(service, tenant, entity)) + .as("permissive limit should allow the first %s check", level).isNull(); + + // Tighten the limit to a single message and push a profile update for this tenant. + service.update(new TenantProfileUpdateResult(profileWithRegularMsgLimit(level, "1:600"), Set.of(tenant))); + + // The freshly merged "1:600" bucket allows exactly one message... + assertThat(level.check(service, tenant, entity)).isNull(); + // ...and blocks the next one. This only happens if update(tenantId) reached the tracked entity. + assertThat(level.check(service, tenant, entity)) + .as("update(tenantId) must reach the tracked %s so the tightened limit applies", level).isNotNull(); + } + private TenantProfile tenantProfile() { + return profileWith(new DefaultTenantProfileConfiguration()); + } + + private TenantProfile profileWithRegularMsgLimit(EntityLevel level, String regularMsgRateLimit) { + DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration(); + level.setRegularMsgRateLimit(config, regularMsgRateLimit); + return profileWith(config); + } + + private TenantProfile profileWith(DefaultTenantProfileConfiguration config) { TenantProfile profile = new TenantProfile(new TenantProfileId(UUID.randomUUID())); profile.setName("test-profile"); TenantProfileData profileData = new TenantProfileData(); - profileData.setConfiguration(new DefaultTenantProfileConfiguration()); + profileData.setConfiguration(config); profile.setProfileData(profileData); return profile; } + private enum EntityLevel { + DEVICE { + @Override + void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value) { + config.setTransportDeviceMsgRateLimit(value); + } + + @Override + Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId) { + return service.checkLimits(tenantId, null, entityId, 0, false); + } + }, + GATEWAY { + @Override + void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value) { + config.setTransportGatewayMsgRateLimit(value); + } + + @Override + Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId) { + return service.checkLimits(tenantId, entityId, null, 0, false); + } + }, + GATEWAY_DEVICE { + @Override + void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value) { + config.setTransportGatewayDeviceMsgRateLimit(value); + } + + @Override + Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId) { + return service.checkLimits(tenantId, null, entityId, 0, true); + } + }; + + abstract void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value); + + abstract Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId); + } + } diff --git a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCacheTest.java b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCacheTest.java index 4da7767330..d2b4544f53 100644 --- a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCacheTest.java +++ b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCacheTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.service; +import com.google.common.util.concurrent.Striped; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,6 +32,8 @@ import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -38,12 +41,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class DefaultTransportTenantProfileCacheTest { @@ -53,8 +59,22 @@ class DefaultTransportTenantProfileCacheTest { private TransportRateLimitService rateLimitService; private ExecutorService executor; + // Must match DefaultTransportTenantProfileCache.TENANT_PROFILE_FETCH_LOCK_STRIPES. + private static final int STRIPE_COUNT = 1024; + private final TenantId tenantA = TenantId.fromUUID(UUID.randomUUID()); - private final TenantId tenantB = TenantId.fromUUID(UUID.randomUUID()); + // Deterministically pick a tenant that maps to a DIFFERENT stripe than tenantA, so the cross-tenant + // test below cannot flake on the ~1/1024 chance two random UUIDs hash to the same stripe. + private final TenantId tenantB = differentStripeFrom(tenantA); + + private static TenantId differentStripeFrom(TenantId other) { + Striped probe = Striped.lock(STRIPE_COUNT); + TenantId candidate = TenantId.fromUUID(UUID.randomUUID()); + while (probe.get(candidate) == probe.get(other)) { + candidate = TenantId.fromUUID(UUID.randomUUID()); + } + return candidate; + } @BeforeEach void setUp() { @@ -107,6 +127,41 @@ class DefaultTransportTenantProfileCacheTest { assertThat(tenantAResult.get(5, TimeUnit.SECONDS)).isNotNull(); } + @Test + void concurrentMissesForSameTenantDedupeToSingleFetch() throws Exception { + // The per-tenant lock exists precisely so that concurrent cold misses for the SAME tenant collapse + // into a single cross-service fetch (the rest are served from cache). Assert that contract directly. + int callers = 8; + CountDownLatch fetchStarted = new CountDownLatch(1); + CountDownLatch releaseFetch = new CountDownLatch(1); + + when(transportService.getEntityProfile(any())).thenAnswer(invocation -> { + fetchStarted.countDown(); + // Hold the (single) in-flight fetch open while the other callers pile up on the per-tenant lock. + releaseFetch.await(5, TimeUnit.SECONDS); + return responseFor(tenantA); + }); + + CountDownLatch allSubmitted = new CountDownLatch(callers); + List> results = new ArrayList<>(); + for (int i = 0; i < callers; i++) { + results.add(executor.submit(() -> { + allSubmitted.countDown(); + return cache.get(tenantA); + })); + } + + assertThat(allSubmitted.await(5, TimeUnit.SECONDS)).as("all callers should start").isTrue(); + assertThat(fetchStarted.await(5, TimeUnit.SECONDS)).as("the first fetch should start").isTrue(); + releaseFetch.countDown(); + + for (Future result : results) { + assertThat(result.get(5, TimeUnit.SECONDS)).isNotNull(); + } + // All 8 callers resolved the same tenant, but only one of them hit the backend. + verify(transportService, times(1)).getEntityProfile(any()); + } + private GetEntityProfileResponseMsg responseFor(TenantId tenantId) { TenantProfile profile = new TenantProfile(new TenantProfileId(UUID.randomUUID())); profile.setName("profile-" + tenantId.getId()); diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 1554b3fc24..97d53384ba 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -133,6 +133,8 @@ redis: blockWhenExhausted: "${REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED:true}" transport: + # Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently. + callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}" # Local CoAP transport parameters coap: # CoaP processing timeout in milliseconds diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index c878887a01..bf8d5e1542 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -167,6 +167,8 @@ redis: # HTTP server parameters transport: + # Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently. + callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}" http: # HTTP request processing timeout in milliseconds request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 51b1ad0a2b..1c14202efd 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -134,6 +134,8 @@ redis: # LWM2M server parameters transport: + # Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently. + callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}" sessions: # Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device. # The parameter value is in milliseconds. diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index dfe35db29c..0c011e207e 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -135,6 +135,8 @@ redis: # MQTT server parameters transport: + # Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently. + callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}" mqtt: # MQTT bind-address bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index d021030a91..b5ec777af6 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -134,6 +134,8 @@ redis: # Snmp server parameters transport: + # Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently. + callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}" snmp: # Enable/disable SNMP transport protocol enabled: "${SNMP_ENABLED:true}"