Browse Source

Address PR review feedback

- Extract the tenant-profile fetch lock stripe count to a named constant (PR #15744)
- Trim the stale global-lock sentence from the per-tenant lock comment (PR #15744)
- Rename the rate-limit onCreate callback to onMiss and document its idempotency requirement (PR #15744)
- Reuse a single tenant profile local in update(TenantProfileUpdateResult) (PR #15744)
- Make the transport callback thread pool size configurable via transport.callback_thread_pool_size (PR #15744)
- Add a parameterized test locking the TransportLimitsType enum-to-profile-field mapping (PR #15744)
- Add device/gateway rate-limit coverage asserting update(tenantId) reaches tracked entities (PR #15744)
- Add a same-tenant fetch-dedup test and pin the cross-tenant test to distinct stripes (PR #15744)
pull/15744/head
Viacheslav Klimov 3 weeks ago
parent
commit
a1bf69cd27
Failed to extract signature
  1. 2
      application/src/main/resources/thingsboard.yml
  2. 19
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java
  3. 4
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  4. 10
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java
  5. 110
      common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitServiceTest.java
  6. 57
      common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCacheTest.java
  7. 2
      transport/coap/src/main/resources/tb-coap-transport.yml
  8. 2
      transport/http/src/main/resources/tb-http-transport.yml
  9. 2
      transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml
  10. 2
      transport/mqtt/src/main/resources/tb-mqtt-transport.yml
  11. 2
      transport/snmp/src/main/resources/tb-snmp-transport.yml

2
application/src/main/resources/thingsboard.yml

@ -1138,6 +1138,8 @@ transport:
timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
# Enable/disable http/mqtt/coap/lwm2m transport protocols (has higher priority than certain protocol's 'enabled' property)
api_enabled: "${TB_TRANSPORT_API_ENABLED:true}"
# Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently.
callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}"
log:
# Enable/Disable log of transport messages to telemetry. For example, logging of LwM2M registration update
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"

19
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java

@ -107,11 +107,12 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
@Override
public void update(TenantProfileUpdateResult update) {
log.info("Received tenant profile update: {}", update.getProfile());
EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(update.getProfile(), TENANT_LIMITS);
EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(update.getProfile(), DEVICE_LIMITS);
EntityTransportRateLimits gatewayRateLimitPrototype = createRateLimits(update.getProfile(), GATEWAY_LIMITS);
EntityTransportRateLimits gatewayDeviceRateLimitPrototype = createRateLimits(update.getProfile(), GATEWAY_DEVICE_LIMITS);
TenantProfile profile = update.getProfile();
log.info("Received tenant profile update: {}", profile);
EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(profile, TENANT_LIMITS);
EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(profile, DEVICE_LIMITS);
EntityTransportRateLimits gatewayRateLimitPrototype = createRateLimits(profile, GATEWAY_LIMITS);
EntityTransportRateLimits gatewayDeviceRateLimitPrototype = createRateLimits(profile, GATEWAY_DEVICE_LIMITS);
for (TenantId tenantId : update.getAffectedTenants()) {
update(tenantId, tenantRateLimitPrototype, deviceRateLimitPrototype, gatewayRateLimitPrototype, gatewayDeviceRateLimitPrototype);
}
@ -301,15 +302,17 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
}
private <T extends EntityId> EntityTransportRateLimits getRateLimits(ConcurrentMap<T, EntityTransportRateLimits> limitsMap, TenantId tenantId,
T entityId, TransportLimitsType limitsType, Runnable onCreate) {
T entityId, TransportLimitsType limitsType, Runnable onMiss) {
EntityTransportRateLimits limits = limitsMap.get(entityId);
if (limits == null) {
// Resolve the tenant profile WITHOUT holding the ConcurrentHashMap bin lock: the fetch may
// block on a cross-service round-trip, so it must run before computeIfAbsent's mapping function.
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
limits = limitsMap.computeIfAbsent(entityId, k -> createRateLimits(tenantProfile, limitsType));
if (onCreate != null) {
onCreate.run();
// Runs on every observed miss, including callers that lost the computeIfAbsent race and got an
// existing value back - NOT only on actual creation, so the callback must be idempotent.
if (onMiss != null) {
onMiss.run();
}
}
return limits;

4
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -153,6 +153,8 @@ public class DefaultTransportService extends TransportActivityManager implements
private int notificationsPollDuration;
@Value("${transport.stats.enabled:false}")
private boolean statsEnabled;
@Value("${transport.callback_thread_pool_size:20}")
private int callbackThreadPoolSize;
@Autowired
@Lazy
@ -198,7 +200,7 @@ public class DefaultTransportService extends TransportActivityManager implements
this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, getClass());
this.scheduler.scheduleAtFixedRate(this::invalidateRateLimits, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
transportApiRequestTemplate.setMessagesStats(transportApiStats);

10
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java

@ -44,9 +44,14 @@ import java.util.concurrent.locks.Lock;
@Slf4j
public class DefaultTransportTenantProfileCache implements TransportTenantProfileCache {
// Number of stripes for the per-tenant fetch locks. Only contended during concurrent cold-cache
// misses (cached tenants never take the lock), and concurrent fetches are already bounded by the
// transport callback pool, so this comfortably over-provisions the realistic concurrency.
private static final int TENANT_PROFILE_FETCH_LOCK_STRIPES = 1024;
// Bounded set of per-tenant locks: de-duplicates concurrent misses for the same tenant while
// letting different tenants fetch concurrently (eager array - no weak-ref overhead at this size).
private final Striped<Lock> tenantProfileFetchLocks = Striped.lock(1024);
private final Striped<Lock> tenantProfileFetchLocks = Striped.lock(TENANT_PROFILE_FETCH_LOCK_STRIPES);
private final ConcurrentMap<TenantProfileId, TenantProfile> profiles = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, TenantProfileId> tenantIds = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantProfileId, Set<TenantId>> tenantProfileIds = new ConcurrentHashMap<>();
@ -108,8 +113,7 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
TenantProfile profile = lookupCached(tenantId);
if (profile == null) {
// Per-tenant lock: de-duplicates concurrent misses for the SAME tenant while allowing
// different tenants to resolve their profiles concurrently. A single global lock here
// serializes the synchronous cross-service fetch below across the entire process.
// different tenants to resolve their profiles concurrently.
Lock lock = tenantProfileFetchLocks.get(tenantId);
lock.lock();
try {

110
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitServiceTest.java

@ -18,13 +18,18 @@ package org.thingsboard.server.common.transport.limits;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.transport.TransportTenantProfileCache;
import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -82,13 +87,116 @@ class DefaultTransportRateLimitServiceTest {
.isTrue();
}
@ParameterizedTest
@EnumSource(TransportLimitsType.class)
void eachLimitsTypeReadsItsOwnProfileFields(TransportLimitsType type) {
// Distinct sentinel per profile field so a transposed method reference (e.g. GATEWAY_DEVICE_LIMITS
// wired to the plain gateway getters) resolves to the wrong value and fails the assertion.
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
config.setTransportTenantMsgRateLimit("tenant-msg");
config.setTransportTenantTelemetryMsgRateLimit("tenant-tele-msg");
config.setTransportTenantTelemetryDataPointsRateLimit("tenant-tele-dp");
config.setTransportDeviceMsgRateLimit("device-msg");
config.setTransportDeviceTelemetryMsgRateLimit("device-tele-msg");
config.setTransportDeviceTelemetryDataPointsRateLimit("device-tele-dp");
config.setTransportGatewayMsgRateLimit("gateway-msg");
config.setTransportGatewayTelemetryMsgRateLimit("gateway-tele-msg");
config.setTransportGatewayTelemetryDataPointsRateLimit("gateway-tele-dp");
config.setTransportGatewayDeviceMsgRateLimit("gateway-device-msg");
config.setTransportGatewayDeviceTelemetryMsgRateLimit("gateway-device-tele-msg");
config.setTransportGatewayDeviceTelemetryDataPointsRateLimit("gateway-device-tele-dp");
String prefix = switch (type) {
case TENANT_LIMITS -> "tenant";
case DEVICE_LIMITS -> "device";
case GATEWAY_LIMITS -> "gateway";
case GATEWAY_DEVICE_LIMITS -> "gateway-device";
};
assertThat(type.getRegularMsgRateLimit().apply(config)).isEqualTo(prefix + "-msg");
assertThat(type.getTelemetryMsgRateLimit().apply(config)).isEqualTo(prefix + "-tele-msg");
assertThat(type.getTelemetryDataPointsRateLimit().apply(config)).isEqualTo(prefix + "-tele-dp");
}
@ParameterizedTest
@EnumSource(EntityLevel.class)
void profileUpdateReachesEntityTrackedDuringFirstCheck(EntityLevel level) {
DeviceId entity = new DeviceId(UUID.randomUUID());
when(tenantProfileCache.get(tenant)).thenReturn(profileWithRegularMsgLimit(level, "100:600"));
DefaultTransportRateLimitService service = new DefaultTransportRateLimitService(tenantProfileCache);
// First check resolves the (permissive) limit and must register the entity into the per-tenant
// tracking set via the onMiss callback - otherwise a later update(tenantId) can't reach it.
assertThat(level.check(service, tenant, entity))
.as("permissive limit should allow the first %s check", level).isNull();
// Tighten the limit to a single message and push a profile update for this tenant.
service.update(new TenantProfileUpdateResult(profileWithRegularMsgLimit(level, "1:600"), Set.of(tenant)));
// The freshly merged "1:600" bucket allows exactly one message...
assertThat(level.check(service, tenant, entity)).isNull();
// ...and blocks the next one. This only happens if update(tenantId) reached the tracked entity.
assertThat(level.check(service, tenant, entity))
.as("update(tenantId) must reach the tracked %s so the tightened limit applies", level).isNotNull();
}
private TenantProfile tenantProfile() {
return profileWith(new DefaultTenantProfileConfiguration());
}
private TenantProfile profileWithRegularMsgLimit(EntityLevel level, String regularMsgRateLimit) {
DefaultTenantProfileConfiguration config = new DefaultTenantProfileConfiguration();
level.setRegularMsgRateLimit(config, regularMsgRateLimit);
return profileWith(config);
}
private TenantProfile profileWith(DefaultTenantProfileConfiguration config) {
TenantProfile profile = new TenantProfile(new TenantProfileId(UUID.randomUUID()));
profile.setName("test-profile");
TenantProfileData profileData = new TenantProfileData();
profileData.setConfiguration(new DefaultTenantProfileConfiguration());
profileData.setConfiguration(config);
profile.setProfileData(profileData);
return profile;
}
private enum EntityLevel {
DEVICE {
@Override
void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value) {
config.setTransportDeviceMsgRateLimit(value);
}
@Override
Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId) {
return service.checkLimits(tenantId, null, entityId, 0, false);
}
},
GATEWAY {
@Override
void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value) {
config.setTransportGatewayMsgRateLimit(value);
}
@Override
Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId) {
return service.checkLimits(tenantId, entityId, null, 0, false);
}
},
GATEWAY_DEVICE {
@Override
void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value) {
config.setTransportGatewayDeviceMsgRateLimit(value);
}
@Override
Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId) {
return service.checkLimits(tenantId, null, entityId, 0, true);
}
};
abstract void setRegularMsgRateLimit(DefaultTenantProfileConfiguration config, String value);
abstract Object check(DefaultTransportRateLimitService service, TenantId tenantId, DeviceId entityId);
}
}

57
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCacheTest.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.transport.service;
import com.google.common.util.concurrent.Striped;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -31,6 +32,8 @@ import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@ -38,12 +41,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class DefaultTransportTenantProfileCacheTest {
@ -53,8 +59,22 @@ class DefaultTransportTenantProfileCacheTest {
private TransportRateLimitService rateLimitService;
private ExecutorService executor;
// Must match DefaultTransportTenantProfileCache.TENANT_PROFILE_FETCH_LOCK_STRIPES.
private static final int STRIPE_COUNT = 1024;
private final TenantId tenantA = TenantId.fromUUID(UUID.randomUUID());
private final TenantId tenantB = TenantId.fromUUID(UUID.randomUUID());
// Deterministically pick a tenant that maps to a DIFFERENT stripe than tenantA, so the cross-tenant
// test below cannot flake on the ~1/1024 chance two random UUIDs hash to the same stripe.
private final TenantId tenantB = differentStripeFrom(tenantA);
private static TenantId differentStripeFrom(TenantId other) {
Striped<Lock> probe = Striped.lock(STRIPE_COUNT);
TenantId candidate = TenantId.fromUUID(UUID.randomUUID());
while (probe.get(candidate) == probe.get(other)) {
candidate = TenantId.fromUUID(UUID.randomUUID());
}
return candidate;
}
@BeforeEach
void setUp() {
@ -107,6 +127,41 @@ class DefaultTransportTenantProfileCacheTest {
assertThat(tenantAResult.get(5, TimeUnit.SECONDS)).isNotNull();
}
@Test
void concurrentMissesForSameTenantDedupeToSingleFetch() throws Exception {
// The per-tenant lock exists precisely so that concurrent cold misses for the SAME tenant collapse
// into a single cross-service fetch (the rest are served from cache). Assert that contract directly.
int callers = 8;
CountDownLatch fetchStarted = new CountDownLatch(1);
CountDownLatch releaseFetch = new CountDownLatch(1);
when(transportService.getEntityProfile(any())).thenAnswer(invocation -> {
fetchStarted.countDown();
// Hold the (single) in-flight fetch open while the other callers pile up on the per-tenant lock.
releaseFetch.await(5, TimeUnit.SECONDS);
return responseFor(tenantA);
});
CountDownLatch allSubmitted = new CountDownLatch(callers);
List<Future<TenantProfile>> results = new ArrayList<>();
for (int i = 0; i < callers; i++) {
results.add(executor.submit(() -> {
allSubmitted.countDown();
return cache.get(tenantA);
}));
}
assertThat(allSubmitted.await(5, TimeUnit.SECONDS)).as("all callers should start").isTrue();
assertThat(fetchStarted.await(5, TimeUnit.SECONDS)).as("the first fetch should start").isTrue();
releaseFetch.countDown();
for (Future<TenantProfile> result : results) {
assertThat(result.get(5, TimeUnit.SECONDS)).isNotNull();
}
// All 8 callers resolved the same tenant, but only one of them hit the backend.
verify(transportService, times(1)).getEntityProfile(any());
}
private GetEntityProfileResponseMsg responseFor(TenantId tenantId) {
TenantProfile profile = new TenantProfile(new TenantProfileId(UUID.randomUUID()));
profile.setName("profile-" + tenantId.getId());

2
transport/coap/src/main/resources/tb-coap-transport.yml

@ -133,6 +133,8 @@ redis:
blockWhenExhausted: "${REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED:true}"
transport:
# Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently.
callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}"
# Local CoAP transport parameters
coap:
# CoaP processing timeout in milliseconds

2
transport/http/src/main/resources/tb-http-transport.yml

@ -167,6 +167,8 @@ redis:
# HTTP server parameters
transport:
# Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently.
callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}"
http:
# HTTP request processing timeout in milliseconds
request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"

2
transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml

@ -134,6 +134,8 @@ redis:
# LWM2M server parameters
transport:
# Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently.
callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}"
sessions:
# Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device.
# The parameter value is in milliseconds.

2
transport/mqtt/src/main/resources/tb-mqtt-transport.yml

@ -135,6 +135,8 @@ redis:
# MQTT server parameters
transport:
# Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently.
callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}"
mqtt:
# MQTT bind-address
bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"

2
transport/snmp/src/main/resources/tb-snmp-transport.yml

@ -134,6 +134,8 @@ redis:
# Snmp server parameters
transport:
# Size of the thread pool that executes transport API callbacks (session registration, telemetry/attribute and RPC responses, entity update notifications, and the tenant profile fetch on a cache miss). Bounds how many such callbacks - including those that block on a backend round-trip - can run concurrently.
callback_thread_pool_size: "${TB_TRANSPORT_CALLBACK_THREAD_POOL_SIZE:20}"
snmp:
# Enable/disable SNMP transport protocol
enabled: "${SNMP_ENABLED:true}"

Loading…
Cancel
Save