From 6ea39e835e3203bfb4bcc6d6e6f7cf7e4552d16d Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 15 Oct 2020 18:16:27 +0300 Subject: [PATCH] Transport Rate Limits are now configurable via Tenant Profile --- .../device_profile/rule_chain_template.json | 3 +- .../server/controller/TenantController.java | 4 +- .../controller/TenantProfileController.java | 5 +- .../profile/DefaultTbDeviceProfileCache.java | 16 +- .../service/profile/TbDeviceProfileCache.java | 3 + .../queue/DefaultTbClusterService.java | 56 ++++-- .../service/queue/TbClusterService.java | 13 +- .../transport/DefaultTransportApiService.java | 63 +++---- .../src/main/resources/thingsboard.yml | 6 +- common/queue/src/main/proto/queue.proto | 47 +++-- ....java => TransportDeviceProfileCache.java} | 2 +- .../common/transport/TransportService.java | 9 +- .../TransportTenantProfileCache.java | 38 +++++ .../DefaultTransportRateLimitFactory.java | 47 +++++ .../DefaultTransportRateLimitService.java | 115 +++++++++++++ .../limits/DummyTransportRateLimit.java | 30 ++++ .../limits/SimpleTransportRateLimit.java | 34 ++++ .../transport/limits/TransportRateLimit.java | 24 +++ .../limits/TransportRateLimitFactory.java | 24 +++ .../limits/TransportRateLimitService.java | 36 ++++ .../limits/TransportRateLimitType.java | 33 ++++ .../profile/TenantProfileUpdateResult.java | 30 ++++ ...> DefaultTransportDeviceProfileCache.java} | 6 +- .../service/DefaultTransportService.java | 160 ++++++++---------- .../DefaultTransportTenantProfileCache.java | 154 +++++++++++++++++ .../TransportTenantRoutingInfoService.java | 24 +-- .../util/DataDecodingEncodingService.java | 2 - .../src/main/resources/tb-coap-transport.yml | 4 - .../src/main/resources/tb-http-transport.yml | 4 - .../src/main/resources/tb-mqtt-transport.yml | 4 - 30 files changed, 785 insertions(+), 211 deletions(-) rename common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/{TransportProfileCache.java => TransportDeviceProfileCache.java} (95%) create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitFactory.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitFactory.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/profile/TenantProfileUpdateResult.java rename common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/{DefaultTransportProfileCache.java => DefaultTransportDeviceProfileCache.java} (91%) create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index 3d076ff812..da9a95b423 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -94,7 +94,8 @@ "name": "Device Profile Node", "debugMode": false, "configuration": { - "persistAlarmRulesState": false + "persistAlarmRulesState": false, + "fetchAlarmRulesStateOnStart": false } } ], diff --git a/application/src/main/java/org/thingsboard/server/controller/TenantController.java b/application/src/main/java/org/thingsboard/server/controller/TenantController.java index 7c23545374..83e3d98b1b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TenantController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TenantController.java @@ -92,6 +92,7 @@ public class TenantController extends BaseController { installScripts.createDefaultRuleChains(tenant.getId()); } tenantProfileCache.evict(tenant.getId()); + tbClusterService.onTenantChange(tenant, null); return tenant; } catch (Exception e) { throw handleException(e); @@ -105,9 +106,10 @@ public class TenantController extends BaseController { checkParameter("tenantId", strTenantId); try { TenantId tenantId = new TenantId(toUUID(strTenantId)); - checkTenantId(tenantId, Operation.DELETE); + Tenant tenant = checkTenantId(tenantId, Operation.DELETE); tenantService.deleteTenant(tenantId); tenantProfileCache.evict(tenantId); + tbClusterService.onTenantDelete(tenant, null); tbClusterService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED); } catch (Exception e) { throw handleException(e); diff --git a/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java b/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java index 4c9d541ea6..0941fac725 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; @@ -96,6 +97,7 @@ public class TenantProfileController extends BaseController { tenantProfile = checkNotNull(tenantProfileService.saveTenantProfile(getTenantId(), tenantProfile)); tenantProfileCache.put(tenantProfile); + tbClusterService.onTenantProfileChange(tenantProfile, null); tbClusterService.onEntityStateChange(TenantId.SYS_TENANT_ID, tenantProfile.getId(), newTenantProfile ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); return tenantProfile; @@ -111,8 +113,9 @@ public class TenantProfileController extends BaseController { checkParameter("tenantProfileId", strTenantProfileId); try { TenantProfileId tenantProfileId = new TenantProfileId(toUUID(strTenantProfileId)); - checkTenantProfileId(tenantProfileId, Operation.DELETE); + TenantProfile profile = checkTenantProfileId(tenantProfileId, Operation.DELETE); tenantProfileService.deleteTenantProfile(getTenantId(), tenantProfileId); + tbClusterService.onTenantProfileDelete(profile, null); } catch (Exception e) { throw handleException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java b/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java index 2d0861f636..6784405adb 100644 --- a/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java @@ -61,7 +61,7 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); if (profile != null) { deviceProfilesMap.put(deviceProfileId, profile); - log.info("[{}] Fetch device profile into cache: {}", profile.getId(), profile); + log.debug("[{}] Fetch device profile into cache: {}", profile.getId(), profile); } } } finally { @@ -91,7 +91,7 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { public void put(DeviceProfile profile) { if (profile.getId() != null) { deviceProfilesMap.put(profile.getId(), profile); - log.info("[{}] pushed device profile to cache: {}", profile.getId(), profile); + log.debug("[{}] pushed device profile to cache: {}", profile.getId(), profile); notifyListeners(profile); } } @@ -99,7 +99,7 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { @Override public void evict(TenantId tenantId, DeviceProfileId profileId) { DeviceProfile oldProfile = deviceProfilesMap.remove(profileId); - log.info("[{}] evict device profile from cache: {}", profileId, oldProfile); + log.debug("[{}] evict device profile from cache: {}", profileId, oldProfile); DeviceProfile newProfile = get(tenantId, profileId); if (newProfile != null) { notifyListeners(newProfile); @@ -116,6 +116,16 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { listeners.computeIfAbsent(tenantId, id -> new ConcurrentHashMap<>()).put(listenerId, listener); } + @Override + public DeviceProfile find(DeviceProfileId deviceProfileId) { + return deviceProfileService.findDeviceProfileById(TenantId.SYS_TENANT_ID, deviceProfileId); + } + + @Override + public DeviceProfile findOrCreateDeviceProfile(TenantId tenantId, String profileName) { + return deviceProfileService.findOrCreateDeviceProfile(tenantId, profileName); + } + @Override public void removeListener(TenantId tenantId, EntityId listenerId) { ConcurrentMap> tenantListeners = listeners.get(tenantId); diff --git a/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java b/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java index c067ead6d2..e65f297d53 100644 --- a/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java @@ -29,4 +29,7 @@ public interface TbDeviceProfileCache extends RuleEngineDeviceProfileCache { void evict(DeviceId id); + DeviceProfile find(DeviceProfileId deviceProfileId); + + DeviceProfile findOrCreateDeviceProfile(TenantId tenantId, String deviceType); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index a4de2d9737..9451b58e9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -23,11 +23,15 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.HasName; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; @@ -189,21 +193,51 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback) { - log.trace("[{}][{}] Processing device profile [{}] change event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); - TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder() - .setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build(); - broadcast(transportMsg); + onEntityChange(deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile, callback); + } + + @Override + public void onTenantProfileChange(TenantProfile tenantProfile, TbQueueCallback callback) { + onEntityChange(TenantId.SYS_TENANT_ID, tenantProfile.getId(), tenantProfile, callback); + } + + @Override + public void onTenantChange(Tenant tenant, TbQueueCallback callback) { + onEntityChange(TenantId.SYS_TENANT_ID, tenant.getId(), tenant, callback); + } + + @Override + public void onDeviceProfileDelete(DeviceProfile entity, TbQueueCallback callback) { + onEntityDelete(entity.getTenantId(), entity.getId(), entity.getName(), callback); } @Override - public void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback) { - log.trace("[{}][{}] Processing device profile [{}] delete event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); - TransportProtos.DeviceProfileDeleteMsg profileDeleteMsg = TransportProtos.DeviceProfileDeleteMsg.newBuilder() - .setProfileIdMSB(deviceProfile.getId().getId().getMostSignificantBits()) - .setProfileIdLSB(deviceProfile.getId().getId().getLeastSignificantBits()) + public void onTenantProfileDelete(TenantProfile entity, TbQueueCallback callback) { + onEntityDelete(TenantId.SYS_TENANT_ID, entity.getId(), entity.getName(), callback); + } + + @Override + public void onTenantDelete(Tenant entity, TbQueueCallback callback) { + onEntityDelete(TenantId.SYS_TENANT_ID, entity.getId(), entity.getName(), callback); + } + + public void onEntityChange(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) { + log.trace("[{}][{}][{}] Processing [{}] change event", tenantId, entityid.getEntityType(), entityid.getId(), entity.getName()); + TransportProtos.EntityUpdateMsg entityUpdateMsg = TransportProtos.EntityUpdateMsg.newBuilder() + .setEntityType(entityid.getEntityType().name()) + .setData(ByteString.copyFrom(encodingService.encode(entity))).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityUpdateMsg(entityUpdateMsg).build(); + broadcast(transportMsg); + } + + private void onEntityDelete(TenantId tenantId, EntityId entityId, String name, TbQueueCallback callback) { + log.trace("[{}][{}][{}] Processing [{}] delete event", tenantId, entityId.getEntityType(), entityId.getId(), name); + TransportProtos.EntityDeleteMsg entityDeleteMsg = TransportProtos.EntityDeleteMsg.newBuilder() + .setEntityType(entityId.getEntityType().name()) + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()) .build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileDeleteMsg(profileDeleteMsg).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityDeleteMsg(entityDeleteMsg).build(); broadcast(transportMsg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java index cf212a06f5..838dbe0eef 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java @@ -17,7 +17,8 @@ package org.thingsboard.server.service.queue; import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -53,5 +54,13 @@ public interface TbClusterService { void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback); - void onDeviceProfileDelete(DeviceProfile deviceProfileId, TbQueueCallback callback); + void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback); + + void onTenantProfileChange(TenantProfile tenantProfile, TbQueueCallback callback); + + void onTenantProfileDelete(TenantProfile tenantProfile, TbQueueCallback callback); + + void onTenantChange(Tenant tenant, TbQueueCallback callback); + + void onTenantDelete(Tenant tenant, TbQueueCallback callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index c806757ddb..10360769a7 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -28,6 +28,7 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData; @@ -45,21 +46,19 @@ import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.dao.device.DeviceCredentialsService; -import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceProvisionService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.provision.ProvisionRequest; import org.thingsboard.server.dao.device.provision.ProvisionResponse; import org.thingsboard.server.dao.relation.RelationService; -import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.util.mapping.JacksonUtil; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @@ -70,6 +69,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.dao.device.provision.ProvisionFailedException; import org.thingsboard.server.service.executors.DbCallbackExecutorService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbTenantProfileCache; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.state.DeviceStateService; @@ -90,9 +90,7 @@ public class DefaultTransportApiService implements TransportApiService { private static final ObjectMapper mapper = new ObjectMapper(); - //TODO: Constructor dependencies; - private final DeviceProfileService deviceProfileService; - private final TenantService tenantService; + private final TbDeviceProfileCache deviceProfileCache; private final TbTenantProfileCache tenantProfileCache; private final DeviceService deviceService; private final RelationService relationService; @@ -103,17 +101,15 @@ public class DefaultTransportApiService implements TransportApiService { private final DataDecodingEncodingService dataDecodingEncodingService; private final DeviceProvisionService deviceProvisionService; - private final ConcurrentMap deviceCreationLocks = new ConcurrentHashMap<>(); - public DefaultTransportApiService(DeviceProfileService deviceProfileService, TenantService tenantService, + public DefaultTransportApiService(TbDeviceProfileCache deviceProfileCache, TbTenantProfileCache tenantProfileCache, DeviceService deviceService, RelationService relationService, DeviceCredentialsService deviceCredentialsService, DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService, TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService, DeviceProvisionService deviceProvisionService) { - this.deviceProfileService = deviceProfileService; - this.tenantService = tenantService; + this.deviceProfileCache = deviceProfileCache; this.tenantProfileCache = tenantProfileCache; this.deviceService = deviceService; this.relationService = relationService; @@ -143,11 +139,8 @@ public class DefaultTransportApiService implements TransportApiService { } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { return Futures.transform(handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); - } else if (transportApiRequestMsg.hasGetTenantRoutingInfoRequestMsg()) { - return Futures.transform(handle(transportApiRequestMsg.getGetTenantRoutingInfoRequestMsg()), - value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); - } else if (transportApiRequestMsg.hasGetDeviceProfileRequestMsg()) { - return Futures.transform(handle(transportApiRequestMsg.getGetDeviceProfileRequestMsg()), + } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { + return Futures.transform(handle(transportApiRequestMsg.getEntityProfileRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) { return Futures.transform(handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()), @@ -238,7 +231,7 @@ public class DefaultTransportApiService implements TransportApiService { device.setName(requestMsg.getDeviceName()); device.setType(requestMsg.getDeviceType()); device.setCustomerId(gateway.getCustomerId()); - DeviceProfile deviceProfile = deviceProfileService.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); + DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); device.setDeviceProfileId(deviceProfile.getId()); device = deviceService.saveDevice(device); relationService.saveRelationAsync(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created")); @@ -258,7 +251,7 @@ public class DefaultTransportApiService implements TransportApiService { } GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() .setDeviceInfo(getDeviceInfoProto(device)); - DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId()); + DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); if (deviceProfile != null) { builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); } else { @@ -320,23 +313,21 @@ public class DefaultTransportApiService implements TransportApiService { .build(); } - private ListenableFuture handle(GetTenantRoutingInfoRequestMsg requestMsg) { - TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); - - ListenableFuture tenantProfileFuture = Futures.immediateFuture(tenantProfileCache.get(tenantId)); - return Futures.transform(tenantProfileFuture, tenantProfile -> TransportApiResponseMsg.newBuilder() - .setGetTenantRoutingInfoResponseMsg(GetTenantRoutingInfoResponseMsg.newBuilder().setIsolatedTbCore(tenantProfile.isIsolatedTbCore()) - .setIsolatedTbRuleEngine(tenantProfile.isIsolatedTbRuleEngine()).build()).build(), dbCallbackExecutorService); - } - - private ListenableFuture handle(TransportProtos.GetDeviceProfileRequestMsg requestMsg) { - DeviceProfileId profileId = new DeviceProfileId(new UUID(requestMsg.getProfileIdMSB(), requestMsg.getProfileIdLSB())); - DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(TenantId.SYS_TENANT_ID, profileId); - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() - .setGetDeviceProfileResponseMsg( - TransportProtos.GetDeviceProfileResponseMsg.newBuilder() - .setData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))) - .build()).build()); + private ListenableFuture handle(GetEntityProfileRequestMsg requestMsg) { + EntityType entityType = EntityType.valueOf(requestMsg.getEntityType()); + UUID entityUuid = new UUID(requestMsg.getEntityIdMSB(), requestMsg.getEntityIdLSB()); + ByteString data; + if (entityType.equals(EntityType.DEVICE_PROFILE)) { + DeviceProfileId deviceProfileId = new DeviceProfileId(entityUuid); + DeviceProfile deviceProfile = deviceProfileCache.find(deviceProfileId); + data = ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)); + } else if (entityType.equals(EntityType.TENANT)) { + TenantProfile tenantProfile = tenantProfileCache.get(new TenantId(entityUuid)); + data = ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile)); + } else { + throw new RuntimeException("Invalid entity profile request: " + entityType); + } + return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(GetEntityProfileResponseMsg.newBuilder().setData(data).build()).build()); } private ListenableFuture getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) { @@ -348,7 +339,7 @@ public class DefaultTransportApiService implements TransportApiService { try { ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder(); builder.setDeviceInfo(getDeviceInfoProto(device)); - DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId()); + DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); if (deviceProfile != null) { builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); } else { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d0ddeb611a..deb28ffd99 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -502,7 +502,7 @@ js: remote: # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}" - # Maximum time in seconds for black listed function to stay in the list. + # Maximum time in seconds for black listed function to stay in 1:the list. max_black_list_duration_sec: "${REMOTE_JS_SANDBOX_MAX_BLACKLIST_DURATION_SEC:60}" stats: enabled: "${TB_JS_REMOTE_STATS_ENABLED:false}" @@ -512,10 +512,6 @@ transport: sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" - rate_limits: - enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" - tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" - device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 166f94b864..01d8ca4524 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -177,32 +177,26 @@ message GetOrCreateDeviceFromGatewayResponseMsg { bytes profileBody = 2; } -message GetTenantRoutingInfoRequestMsg { - int64 tenantIdMSB = 1; - int64 tenantIdLSB = 2; -} - -message GetTenantRoutingInfoResponseMsg { - bool isolatedTbCore = 1; - bool isolatedTbRuleEngine = 2; -} - -message GetDeviceProfileRequestMsg { - int64 profileIdMSB = 1; - int64 profileIdLSB = 2; +message GetEntityProfileRequestMsg { + string entityType = 1; + int64 entityIdMSB = 2; + int64 entityIdLSB = 3; } -message GetDeviceProfileResponseMsg { - bytes data = 1; +message GetEntityProfileResponseMsg { + string entityType = 1; + bytes data = 2; } -message DeviceProfileUpdateMsg { - bytes data = 1; +message EntityUpdateMsg { + string entityType = 1; + bytes data = 2; } -message DeviceProfileDeleteMsg { - int64 profileIdMSB = 1; - int64 profileIdLSB = 2; +message EntityDeleteMsg { + string entityType = 1; + int64 entityIdMSB = 2; + int64 entityIdLSB = 3; } message SessionCloseNotificationProto { @@ -482,8 +476,7 @@ message TransportApiRequestMsg { ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1; ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2; GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3; - GetTenantRoutingInfoRequestMsg getTenantRoutingInfoRequestMsg = 4; - GetDeviceProfileRequestMsg getDeviceProfileRequestMsg = 5; + GetEntityProfileRequestMsg entityProfileRequestMsg = 4; ValidateBasicMqttCredRequestMsg validateBasicMqttCredRequestMsg = 6; ProvisionDeviceRequestMsg provisionDeviceRequestMsg = 7; } @@ -492,9 +485,8 @@ message TransportApiRequestMsg { message TransportApiResponseMsg { ValidateDeviceCredentialsResponseMsg validateCredResponseMsg = 1; GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2; - GetTenantRoutingInfoResponseMsg getTenantRoutingInfoResponseMsg = 4; - GetDeviceProfileResponseMsg getDeviceProfileResponseMsg = 5; - ProvisionDeviceResponseMsg provisionDeviceResponseMsg = 6; + GetEntityProfileResponseMsg entityProfileResponseMsg = 3; + ProvisionDeviceResponseMsg provisionDeviceResponseMsg = 4; } /* Messages that are handled by ThingsBoard Core Service */ @@ -535,7 +527,8 @@ message ToTransportMsg { AttributeUpdateNotificationMsg attributeUpdateNotification = 5; ToDeviceRpcRequestMsg toDeviceRequest = 6; ToServerRpcResponseMsg toServerResponse = 7; - DeviceProfileUpdateMsg deviceProfileUpdateMsg = 8; - DeviceProfileDeleteMsg deviceProfileDeleteMsg = 9; + /* For Tenant, TenantProfile and DeviceProfile */ + EntityUpdateMsg entityUpdateMsg = 8; + EntityDeleteMsg entityDeleteMsg = 9; ProvisionDeviceResponseMsg provisionResponse = 10; } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java similarity index 95% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java rename to common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java index ee05e59010..d56c0291b7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import java.util.Optional; -public interface TransportProfileCache { +public interface TransportDeviceProfileCache { DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 775a91f720..9a2fd75d47 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -20,11 +20,12 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; @@ -47,7 +48,7 @@ import java.util.concurrent.ScheduledExecutorService; */ public interface TransportService { - GetTenantRoutingInfoResponseMsg getRoutingInfo(GetTenantRoutingInfoRequestMsg msg); + GetEntityProfileResponseMsg getRoutingInfo(GetEntityProfileRequestMsg msg); void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback); @@ -64,8 +65,6 @@ public interface TransportService { void process(ProvisionDeviceRequestMsg msg, TransportServiceCallback callback); - void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback callback); - void onProfileUpdate(DeviceProfile deviceProfile); boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java new file mode 100644 index 0000000000..1f6b31a356 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport; + +import com.google.protobuf.ByteString; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import org.thingsboard.server.queue.discovery.TenantRoutingInfo; +import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; + +import java.util.Set; + +public interface TransportTenantProfileCache { + + TenantProfile get(TenantId tenantId); + + TenantProfileUpdateResult put(ByteString profileBody); + + boolean put(TenantId tenantId, TenantProfileId profileId); + + Set remove(TenantProfileId profileId); + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitFactory.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitFactory.java new file mode 100644 index 0000000000..362502bf17 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitFactory.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import org.thingsboard.server.common.msg.tools.TbRateLimits; + +@Slf4j +@Component +public class DefaultTransportRateLimitFactory implements TransportRateLimitFactory { + + private static final DummyTransportRateLimit ALWAYS_TRUE = new DummyTransportRateLimit(); + + @Override + public TransportRateLimit create(TransportRateLimitType type, Object configuration) { + if (!StringUtils.isEmpty(configuration)) { + try { + return new SimpleTransportRateLimit(new TbRateLimits(configuration.toString()), configuration.toString()); + } catch (Exception e) { + log.warn("[{}] Failed to init rate limit with configuration: {}", type, configuration, e); + return ALWAYS_TRUE; + } + } else { + return ALWAYS_TRUE; + } + } + + @Override + public TransportRateLimit createDefault(TransportRateLimitType type) { + return ALWAYS_TRUE; + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java new file mode 100644 index 0000000000..9446022ec5 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java @@ -0,0 +1,115 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.TenantProfileData; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; +import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Service +@Slf4j +public class DefaultTransportRateLimitService implements TransportRateLimitService { + + private final ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); + private final ConcurrentMap perDeviceLimits = new ConcurrentHashMap<>(); + + private final TransportRateLimitFactory rateLimitFactory; + private final TransportTenantProfileCache tenantProfileCache; + + public DefaultTransportRateLimitService(TransportRateLimitFactory rateLimitFactory, TransportTenantProfileCache tenantProfileCache) { + this.rateLimitFactory = rateLimitFactory; + this.tenantProfileCache = tenantProfileCache; + } + + @Override + public TransportRateLimit getRateLimit(TenantId tenantId, TransportRateLimitType limitType) { + TransportRateLimit[] limits = perTenantLimits.get(tenantId); + if (limits == null) { + limits = fetchProfileAndInit(tenantId); + perTenantLimits.put(tenantId, limits); + } + return limits[limitType.ordinal()]; + } + + @Override + public TransportRateLimit getRateLimit(TenantId tenantId, DeviceId deviceId, TransportRateLimitType limitType) { + TransportRateLimit[] limits = perDeviceLimits.get(deviceId); + if (limits == null) { + limits = fetchProfileAndInit(tenantId); + perDeviceLimits.put(deviceId, limits); + } + return limits[limitType.ordinal()]; + } + + @Override + public void update(TenantProfileUpdateResult update) { + TransportRateLimit[] newLimits = createTransportRateLimits(update.getProfile()); + for (TenantId tenantId : update.getAffectedTenants()) { + mergeLimits(tenantId, newLimits); + } + } + + @Override + public void update(TenantId tenantId) { + mergeLimits(tenantId, fetchProfileAndInit(tenantId)); + } + + public void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) { + TransportRateLimit[] oldRateLimits = perTenantLimits.get(tenantId); + if (oldRateLimits == null) { + perTenantLimits.put(tenantId, newRateLimits); + } else { + for (int i = 0; i < TransportRateLimitType.values().length; i++) { + TransportRateLimit newLimit = newRateLimits[i]; + TransportRateLimit oldLimit = oldRateLimits[i]; + if (newLimit != null && (oldLimit == null || !oldLimit.getConfiguration().equals(newLimit.getConfiguration()))) { + oldRateLimits[i] = newLimit; + } + } + } + } + + @Override + public void remove(TenantId tenantId) { + perTenantLimits.remove(tenantId); + } + + @Override + public void remove(DeviceId deviceId) { + perDeviceLimits.remove(deviceId); + } + + private TransportRateLimit[] fetchProfileAndInit(TenantId tenantId) { + return perTenantLimits.computeIfAbsent(tenantId, tmp -> createTransportRateLimits(tenantProfileCache.get(tenantId))); + } + + private TransportRateLimit[] createTransportRateLimits(TenantProfile tenantProfile) { + TenantProfileData profileData = tenantProfile.getProfileData(); + TransportRateLimit[] rateLimits = new TransportRateLimit[TransportRateLimitType.values().length]; + for (TransportRateLimitType type : TransportRateLimitType.values()) { + rateLimits[type.ordinal()] = rateLimitFactory.create(type, profileData.getProperties().get(type.getConfigurationKey())); + } + return rateLimits; + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java new file mode 100644 index 0000000000..d6d58d55ba --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +public class DummyTransportRateLimit implements TransportRateLimit { + + @Override + public String getConfiguration() { + return ""; + } + + @Override + public boolean tryConsume() { + return true; + } + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java new file mode 100644 index 0000000000..08fbb7ec5d --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.msg.tools.TbRateLimits; + +@RequiredArgsConstructor +public class SimpleTransportRateLimit implements TransportRateLimit { + + private final TbRateLimits rateLimit; + @Getter + private final String configuration; + + @Override + public boolean tryConsume() { + return rateLimit.tryConsume(); + } + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java new file mode 100644 index 0000000000..0901e1becc --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +public interface TransportRateLimit { + + String getConfiguration(); + + boolean tryConsume(); + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitFactory.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitFactory.java new file mode 100644 index 0000000000..54d83610c2 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitFactory.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +public interface TransportRateLimitFactory { + + TransportRateLimit create(TransportRateLimitType type, Object config); + + TransportRateLimit createDefault(TransportRateLimitType type); + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java new file mode 100644 index 0000000000..2f2e808a36 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; + +public interface TransportRateLimitService { + + TransportRateLimit getRateLimit(TenantId tenantId, TransportRateLimitType limit); + + TransportRateLimit getRateLimit(TenantId tenantId, DeviceId deviceId, TransportRateLimitType limit); + + void update(TenantProfileUpdateResult update); + + void update(TenantId tenantId); + + void remove(TenantId tenantId); + + void remove(DeviceId deviceId); + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java new file mode 100644 index 0000000000..becc5fbc86 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.limits; + +import lombok.Getter; + +public enum TransportRateLimitType { + + TENANT_MAX_MSGS("transport.tenant.max.msg"), + TENANT_MAX_DATA_POINTS("transport.tenant.max.dataPoints"), + DEVICE_MAX_MSGS("transport.device.max.msg"), + DEVICE_MAX_DATA_POINTS("transport.device.max.dataPoints"); + + @Getter + private final String configurationKey; + + TransportRateLimitType(String configurationKey) { + this.configurationKey = configurationKey; + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/profile/TenantProfileUpdateResult.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/profile/TenantProfileUpdateResult.java new file mode 100644 index 0000000000..53950e6de8 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/profile/TenantProfileUpdateResult.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.profile; + +import lombok.Data; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.Set; + +@Data +public class TenantProfileUpdateResult { + + private final TenantProfile profile; + private final Set affectedTenants; + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java similarity index 91% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java rename to common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java index 4d955de70c..3b44fd6a54 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java @@ -21,7 +21,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.id.DeviceProfileId; -import org.thingsboard.server.common.transport.TransportProfileCache; +import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import java.util.Optional; @@ -31,13 +31,13 @@ import java.util.concurrent.ConcurrentMap; @Slf4j @Component @ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") -public class DefaultTransportProfileCache implements TransportProfileCache { +public class DefaultTransportDeviceProfileCache implements TransportDeviceProfileCache { private final ConcurrentMap deviceProfiles = new ConcurrentHashMap<>(); private final DataDecodingEncodingService dataDecodingEncodingService; - public DefaultTransportProfileCache(DataDecodingEncodingService dataDecodingEncodingService) { + public DefaultTransportDeviceProfileCache(DataDecodingEncodingService dataDecodingEncodingService) { this.dataDecodingEncodingService = dataDecodingEncodingService; } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 47abc07286..7631658602 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -29,25 +29,35 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; +import org.thingsboard.server.common.stats.MessagesStats; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.common.transport.SessionMsgListener; -import org.thingsboard.server.common.transport.TransportProfileCache; +import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; +import org.thingsboard.server.common.transport.limits.TransportRateLimit; +import org.thingsboard.server.common.transport.limits.TransportRateLimitService; +import org.thingsboard.server.common.transport.limits.TransportRateLimitType; +import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; @@ -69,15 +79,13 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; -import org.thingsboard.server.common.stats.MessagesStats; -import org.thingsboard.server.common.stats.StatsFactory; -import org.thingsboard.server.common.stats.StatsType; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -98,12 +106,6 @@ import java.util.concurrent.atomic.AtomicInteger; @ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") public class DefaultTransportService implements TransportService { - @Value("${transport.rate_limits.enabled}") - private boolean rateLimitEnabled; - @Value("${transport.rate_limits.tenant}") - private String perTenantLimitsConf; - @Value("${transport.rate_limits.device}") - private String perDevicesLimitsConf; @Value("${transport.sessions.inactivity_timeout}") private long sessionInactivityTimeout; @Value("${transport.sessions.report_timeout}") @@ -119,7 +121,10 @@ public class DefaultTransportService implements TransportService { private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; private final StatsFactory statsFactory; - private final TransportProfileCache transportProfileCache; + private final TransportDeviceProfileCache deviceProfileCache; + private final TransportTenantProfileCache tenantProfileCache; + private final TransportRateLimitService rateLimitService; + private final DataDecodingEncodingService dataDecodingEncodingService; protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; protected TbQueueProducer> ruleEngineMsgProducer; @@ -132,14 +137,11 @@ public class DefaultTransportService implements TransportService { protected ScheduledExecutorService schedulerExecutor; protected ExecutorService transportCallbackExecutor; + private ExecutorService mainConsumerExecutor; private final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final Map toServerRpcPendingMap = new ConcurrentHashMap<>(); - //TODO 3.2: @ybondarenko Implement cleanup of this maps. - private final ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); - private final ConcurrentMap perDeviceLimits = new ConcurrentHashMap<>(); - private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); private volatile boolean stopped = false; public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, @@ -147,22 +149,22 @@ public class DefaultTransportService implements TransportService { TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory, - TransportProfileCache transportProfileCache) { + TransportDeviceProfileCache deviceProfileCache, + TransportTenantProfileCache tenantProfileCache, + TransportRateLimitService rateLimitService, DataDecodingEncodingService dataDecodingEncodingService) { this.serviceInfoProvider = serviceInfoProvider; this.queueProvider = queueProvider; this.producerProvider = producerProvider; this.partitionService = partitionService; this.statsFactory = statsFactory; - this.transportProfileCache = transportProfileCache; + this.deviceProfileCache = deviceProfileCache; + this.tenantProfileCache = tenantProfileCache; + this.rateLimitService = rateLimitService; + this.dataDecodingEncodingService = dataDecodingEncodingService; } @PostConstruct public void init() { - if (rateLimitEnabled) { - //Just checking the configuration parameters - new TbRateLimits(perTenantLimitsConf); - new TbRateLimits(perDevicesLimitsConf); - } this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer"); this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer"); this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer"); @@ -177,6 +179,7 @@ public class DefaultTransportService implements TransportService { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId()); transportNotificationsConsumer.subscribe(Collections.singleton(tpi)); transportApiRequestTemplate.init(); + mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); mainConsumerExecutor.execute(() -> { while (!stopped) { try { @@ -208,10 +211,6 @@ public class DefaultTransportService implements TransportService { @PreDestroy public void destroy() { - if (rateLimitEnabled) { - perTenantLimits.clear(); - perDeviceLimits.clear(); - } stopped = true; if (transportNotificationsConsumer != null) { @@ -232,7 +231,7 @@ public class DefaultTransportService implements TransportService { } @Override - public ScheduledExecutorService getSchedulerExecutor(){ + public ScheduledExecutorService getSchedulerExecutor() { return this.schedulerExecutor; } @@ -242,12 +241,12 @@ public class DefaultTransportService implements TransportService { } @Override - public TransportProtos.GetTenantRoutingInfoResponseMsg getRoutingInfo(TransportProtos.GetTenantRoutingInfoRequestMsg msg) { + public TransportProtos.GetEntityProfileResponseMsg getRoutingInfo(TransportProtos.GetEntityProfileRequestMsg msg) { TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build()); + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); try { TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); - return response.getValue().getGetTenantRoutingInfoResponseMsg(); + return response.getValue().getEntityProfileResponseMsg(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -289,7 +288,7 @@ public class DefaultTransportService implements TransportService { result.deviceInfo(tdi); ByteString profileBody = msg.getProfileBody(); if (profileBody != null && !profileBody.isEmpty()) { - DeviceProfile profile = transportProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody); + DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody); if (transportType != DeviceTransportType.DEFAULT && profile != null && profile.getTransportType() != DeviceTransportType.DEFAULT && profile.getTransportType() != transportType) { log.debug("[{}] Device profile [{}] has different transport type: {}, expected: {}", tdi.getDeviceId(), tdi.getDeviceProfileId(), profile.getTransportType(), transportType); @@ -315,7 +314,7 @@ public class DefaultTransportService implements TransportService { result.deviceInfo(tdi); ByteString profileBody = msg.getProfileBody(); if (profileBody != null && !profileBody.isEmpty()) { - result.deviceProfile(transportProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody)); + result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody)); } } return result.build(); @@ -339,8 +338,8 @@ public class DefaultTransportService implements TransportService { log.trace("Processing msg: {}", requestMsg); TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setProvisionDeviceRequestMsg(requestMsg).build()); ListenableFuture response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> - tmp.getValue().getProvisionDeviceResponseMsg() - , MoreExecutors.directExecutor()); + tmp.getValue().getProvisionDeviceResponseMsg() + , MoreExecutors.directExecutor()); AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor); } @@ -580,12 +579,11 @@ public class DefaultTransportService implements TransportService { if (log.isTraceEnabled()) { log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); } - if (!rateLimitEnabled) { - return true; - } TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); - TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(tenantId, id -> new TbRateLimits(perTenantLimitsConf)); - if (!rateLimits.tryConsume()) { + + TransportRateLimit tenantRateLimit = rateLimitService.getRateLimit(tenantId, TransportRateLimitType.TENANT_MAX_MSGS); + + if (!tenantRateLimit.tryConsume()) { if (callback != null) { callback.onError(new TbRateLimitsException(EntityType.TENANT)); } @@ -595,8 +593,8 @@ public class DefaultTransportService implements TransportService { return false; } DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); - rateLimits = perDeviceLimits.computeIfAbsent(deviceId, id -> new TbRateLimits(perDevicesLimitsConf)); - if (!rateLimits.tryConsume()) { + TransportRateLimit deviceRateLimit = rateLimitService.getRateLimit(tenantId, deviceId, TransportRateLimitType.DEVICE_MAX_MSGS); + if (!deviceRateLimit.tryConsume()) { if (callback != null) { callback.onError(new TbRateLimitsException(EntityType.DEVICE)); } @@ -637,16 +635,40 @@ public class DefaultTransportService implements TransportService { deregisterSession(md.getSessionInfo()); } } else { - if (toSessionMsg.hasDeviceProfileUpdateMsg()) { - DeviceProfile deviceProfile = transportProfileCache.put(toSessionMsg.getDeviceProfileUpdateMsg().getData()); - if (deviceProfile != null) { - onProfileUpdate(deviceProfile); + if (toSessionMsg.hasEntityUpdateMsg()) { + TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); + EntityType entityType = EntityType.valueOf(msg.getEntityType()); + if (EntityType.DEVICE_PROFILE.equals(entityType)) { + DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData()); + if (deviceProfile != null) { + onProfileUpdate(deviceProfile); + } + } else if (EntityType.TENANT_PROFILE.equals(entityType)) { + TenantProfileUpdateResult update = tenantProfileCache.put(msg.getData()); + rateLimitService.update(update); + } else if (EntityType.TENANT.equals(entityType)) { + Optional profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); + if (profileOpt.isPresent()) { + Tenant tenant = profileOpt.get(); + boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); + if (updated) { + rateLimitService.update(tenant.getId()); + } + } + } + } else if (toSessionMsg.hasEntityDeleteMsg()) { + TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); + EntityType entityType = EntityType.valueOf(msg.getEntityType()); + UUID entityUuid = new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()); + if (EntityType.DEVICE_PROFILE.equals(entityType)) { + deviceProfileCache.evict(new DeviceProfileId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()))); + } else if (EntityType.TENANT_PROFILE.equals(entityType)) { + tenantProfileCache.remove(new TenantProfileId(entityUuid)); + } else if (EntityType.TENANT.equals(entityType)) { + rateLimitService.remove(new TenantId(entityUuid)); + } else if (EntityType.DEVICE.equals(entityType)) { + rateLimitService.remove(new DeviceId(entityUuid)); } - } else if (toSessionMsg.hasDeviceProfileDeleteMsg()) { - transportProfileCache.evict(new DeviceProfileId(new UUID( - toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdMSB(), - toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdLSB() - ))); } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId); @@ -654,38 +676,6 @@ public class DefaultTransportService implements TransportService { } } - @Override - public void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback callback) { - DeviceProfile deviceProfile = transportProfileCache.get(deviceProfileId); - if (deviceProfile != null) { - callback.onSuccess(deviceProfile); - } else { - log.trace("Processing device profile request: [{}]", deviceProfileId); - TransportProtos.GetDeviceProfileRequestMsg msg = TransportProtos.GetDeviceProfileRequestMsg.newBuilder() - .setProfileIdMSB(deviceProfileId.getId().getMostSignificantBits()) - .setProfileIdLSB(deviceProfileId.getId().getLeastSignificantBits()) - .build(); - TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), - TransportApiRequestMsg.newBuilder().setGetDeviceProfileRequestMsg(msg).build()); - AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), - response -> { - ByteString devProfileBody = response.getValue().getGetDeviceProfileResponseMsg().getData(); - if (devProfileBody != null && !devProfileBody.isEmpty()) { - DeviceProfile profile = transportProfileCache.put(devProfileBody); - if (profile != null) { - callback.onSuccess(profile); - } else { - log.warn("Failed to decode device profile: {}", devProfileBody); - callback.onError(new IllegalArgumentException("Failed to decode device profile!")); - } - } else { - log.warn("Failed to find device profile: [{}]", deviceProfileId); - callback.onError(new IllegalArgumentException("Failed to find device profile!")); - } - }, callback::onError, transportCallbackExecutor); - } - } - @Override public void onProfileUpdate(DeviceProfile deviceProfile) { long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits(); @@ -750,7 +740,7 @@ public class DefaultTransportService implements TransportService { private RuleChainId resolveRuleChainId(TransportProtos.SessionInfoProto sessionInfo) { DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); - DeviceProfile deviceProfile = transportProfileCache.get(deviceProfileId); + DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); RuleChainId ruleChainId; if (deviceProfile == null) { log.warn("[{}] Device profile is null!", deviceProfileId); @@ -779,7 +769,7 @@ public class DefaultTransportService implements TransportService { } } - private class StatsCallback implements TbQueueCallback { + private static class StatsCallback implements TbQueueCallback { private final TbQueueCallback callback; private final MessagesStats stats; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java new file mode 100644 index 0000000000..717627a60e --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java @@ -0,0 +1,154 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.service; + +import com.google.protobuf.ByteString; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; +import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.TenantRoutingInfo; +import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; + +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Component +@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +@Slf4j +public class DefaultTransportTenantProfileCache implements TransportTenantProfileCache { + + private final Lock tenantProfileFetchLock = new ReentrantLock(); + private final ConcurrentMap profiles = new ConcurrentHashMap<>(); + private final ConcurrentMap tenantIds = new ConcurrentHashMap<>(); + private final ConcurrentMap> tenantProfileIds = new ConcurrentHashMap<>(); + private final DataDecodingEncodingService dataDecodingEncodingService; + + private TransportService transportService; + + @Lazy + @Autowired + public void setTransportService(TransportService transportService) { + this.transportService = transportService; + } + + public DefaultTransportTenantProfileCache(DataDecodingEncodingService dataDecodingEncodingService) { + this.dataDecodingEncodingService = dataDecodingEncodingService; + } + + @Override + public TenantProfile get(TenantId tenantId) { + return getTenantProfile(tenantId); + } + + @Override + public TenantProfileUpdateResult put(ByteString profileBody) { + Optional profileOpt = dataDecodingEncodingService.decode(profileBody.toByteArray()); + if (profileOpt.isPresent()) { + TenantProfile newProfile = profileOpt.get(); + log.trace("[{}] put: {}", newProfile.getId(), newProfile); + return new TenantProfileUpdateResult(newProfile, tenantProfileIds.get(newProfile.getId())); + } else { + log.warn("Failed to decode profile: {}", profileBody.toString()); + return new TenantProfileUpdateResult(null, Collections.emptySet()); + } + } + + @Override + public boolean put(TenantId tenantId, TenantProfileId profileId) { + log.trace("[{}] put: {}", tenantId, profileId); + TenantProfileId oldProfileId = tenantIds.get(tenantId); + if (oldProfileId != null && !oldProfileId.equals(profileId)) { + tenantProfileIds.computeIfAbsent(oldProfileId, id -> ConcurrentHashMap.newKeySet()).remove(tenantId); + tenantIds.put(tenantId, profileId); + tenantProfileIds.computeIfAbsent(profileId, id -> ConcurrentHashMap.newKeySet()).add(tenantId); + return true; + } else { + return false; + } + } + + @Override + public Set remove(TenantProfileId profileId) { + Set tenants = tenantProfileIds.remove(profileId); + if (tenants != null) { + tenants.forEach(tenantIds::remove); + } + profiles.remove(profileId); + return tenants; + } + + private TenantProfile getTenantProfile(TenantId tenantId) { + TenantProfile profile = null; + TenantProfileId tenantProfileId = tenantIds.get(tenantId); + if (tenantProfileId != null) { + profile = profiles.get(tenantProfileId); + } + if (profile == null) { + tenantProfileFetchLock.lock(); + try { + tenantProfileId = tenantIds.get(tenantId); + if (tenantProfileId != null) { + profile = profiles.get(tenantProfileId); + } + if (profile == null) { + TransportProtos.GetEntityProfileRequestMsg msg = TransportProtos.GetEntityProfileRequestMsg.newBuilder() + .setEntityType(EntityType.TENANT.name()) + .setEntityIdMSB(tenantId.getId().getMostSignificantBits()) + .setEntityIdLSB(tenantId.getId().getLeastSignificantBits()) + .build(); + TransportProtos.GetEntityProfileResponseMsg routingInfo = transportService.getRoutingInfo(msg); + Optional profileOpt = dataDecodingEncodingService.decode(routingInfo.getData().toByteArray()); + if (profileOpt.isPresent()) { + profile = profileOpt.get(); + TenantProfile existingProfile = profiles.get(profile.getId()); + if (existingProfile != null) { + profile = existingProfile; + } else { + profiles.put(profile.getId(), profile); + } + tenantProfileIds.computeIfAbsent(profile.getId(), id -> ConcurrentHashMap.newKeySet()).add(tenantId); + tenantIds.put(tenantId, profile.getId()); + } else { + log.warn("[{}] Can't decode tenant profile: {}", tenantId, routingInfo.getData()); + throw new RuntimeException("Can't decode tenant profile!"); + } + } + } finally { + tenantProfileFetchLock.unlock(); + } + } + return profile; + } + + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java index f2534a64c9..55dee282ec 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java @@ -16,14 +16,11 @@ package org.thingsboard.server.common.transport.service; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.queue.discovery.TenantRoutingInfo; import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; @@ -32,21 +29,16 @@ import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; @ConditionalOnExpression("'${service.type:null}'=='tb-transport'") public class TransportTenantRoutingInfoService implements TenantRoutingInfoService { - private TransportService transportService; + private TransportTenantProfileCache tenantProfileCache; - @Lazy - @Autowired - public void setTransportService(TransportService transportService) { - this.transportService = transportService; + public TransportTenantRoutingInfoService(TransportTenantProfileCache tenantProfileCache) { + this.tenantProfileCache = tenantProfileCache; } @Override public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { - GetTenantRoutingInfoRequestMsg msg = GetTenantRoutingInfoRequestMsg.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .build(); - GetTenantRoutingInfoResponseMsg routingInfo = transportService.getRoutingInfo(msg); - return new TenantRoutingInfo(tenantId, routingInfo.getIsolatedTbCore(), routingInfo.getIsolatedTbRuleEngine()); + TenantProfile profile = tenantProfileCache.get(tenantId); + return new TenantRoutingInfo(tenantId, profile.isIsolatedTbCore(), profile.isIsolatedTbRuleEngine()); } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/DataDecodingEncodingService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/DataDecodingEncodingService.java index 1b10cb5dc3..bf70fa5ef1 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/DataDecodingEncodingService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/DataDecodingEncodingService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.common.transport.util; -import org.thingsboard.server.common.msg.TbActorMsg; - import java.util.Optional; public interface DataDecodingEncodingService { diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 3f57b32fab..229196d325 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -49,10 +49,6 @@ transport: sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" - rate_limits: - enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" - tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" - device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 77d5f30fa7..6aaa42ca18 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -42,10 +42,6 @@ transport: sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" - rate_limits: - enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" - tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" - device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index f01b15c77a..f9341dfb86 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -71,10 +71,6 @@ transport: sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" - rate_limits: - enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" - tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" - device: "${TB_TRANSPORT_RATE_LIMITS_DEVICE:10:1,300:60}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"