diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index c4157c0714..6c08b61f09 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -51,7 +51,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes private int maxInflightMessages; @Test - public void getServiceConfigurationRpcForDeviceTest() throws Exception { + public void getSessionLimitsRpcForDeviceTest() throws Exception { loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); @@ -75,29 +75,30 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes client.setCallback(callback); client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); - client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes()); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { - }); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); assertNotNull(response); - assertEquals(response.size(), 6); - assertEquals(response.get("deviceMsgRateLimit"), profileConfiguration.getTransportDeviceMsgRateLimit()); - assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); - assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + assertEquals(4, response.size()); assertEquals(response.get("maxPayloadSize"), maxPayloadSize); assertEquals(response.get("maxInflightMessages"), maxInflightMessages); assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + Map rateLimits = (Map) response.get("rateLimits"); + assertEquals(3, rateLimits.size()); + assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportDeviceMsgRateLimit()); + assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); + assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); client.disconnect(); } @Test - public void getServiceConfigurationRpcForGatewayTest() throws Exception { + public void getSessionLimitsRpcForGatewayTest() throws Exception { loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); @@ -125,25 +126,26 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes client.setCallback(callback); client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); - client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes()); assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { - }); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); assertNotNull(response); - assertEquals(response.size(), 9); - assertEquals(response.get("gatewayMsgRateLimit"), profileConfiguration.getTransportGatewayMsgRateLimit()); - assertEquals(response.get("gatewayTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); - assertEquals(response.get("gatewayTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); - assertEquals(response.get("gatewayDeviceMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); - assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); - assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + assertEquals(4, response.size()); assertEquals(response.get("maxPayloadSize"), maxPayloadSize); assertEquals(response.get("maxInflightMessages"), maxInflightMessages); assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + Map rateLimits = (Map) response.get("rateLimits"); + assertEquals(6, rateLimits.size()); + assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportGatewayMsgRateLimit()); + assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); + assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); + assertEquals(rateLimits.get("deviceMessages"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); + assertEquals(rateLimits.get("deviceTelemetryMessages"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); + assertEquals(rateLimits.get("deviceTelemetryDataPoints"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); client.disconnect(); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 151131091b..0fce2048a4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -58,11 +58,9 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; @@ -135,7 +133,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); - private static final String SERVICE_CONFIGURATION = "getServiceConfiguration"; + private static final String SESSION_LIMITS = "getSessionLimits"; private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; @@ -500,8 +498,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); toServerRpcSubTopicType = TopicType.V1; - if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) { - onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); + if (SESSION_LIMITS.equals(rpcRequestMsg.getMethodName())) { + onGetSessionLimitsRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); } else { transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); } @@ -1340,32 +1338,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { + private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); - Map serviceConfiguration = new HashMap<>(); + Map sessionLimits = new HashMap<>(); + Map rateLimits = new HashMap<>(); if (sessionInfo.getIsGateway()) { - serviceConfiguration.put("gatewayMsgRateLimit", profile.getTransportGatewayMsgRateLimit()); - serviceConfiguration.put("gatewayTelemetryMsgRateLimit", profile.getTransportGatewayTelemetryMsgRateLimit()); - serviceConfiguration.put("gatewayTelemetryDataPointsRateLimit", profile.getTransportGatewayTelemetryDataPointsRateLimit()); - serviceConfiguration.put("gatewayDeviceMsgRateLimit", profile.getTransportGatewayDeviceMsgRateLimit()); - serviceConfiguration.put("gatewayDeviceTelemetryMsgRateLimit", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); - serviceConfiguration.put("gatewayDeviceTelemetryDataPointsRateLimit", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + rateLimits.put("messages", profile.getTransportGatewayMsgRateLimit()); + rateLimits.put("telemetryMessages", profile.getTransportGatewayTelemetryMsgRateLimit()); + rateLimits.put("telemetryDataPoints", profile.getTransportGatewayTelemetryDataPointsRateLimit()); + rateLimits.put("deviceMessages", profile.getTransportGatewayDeviceMsgRateLimit()); + rateLimits.put("deviceTelemetryMessages", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); + rateLimits.put("deviceTelemetryDataPoints", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); } else { - serviceConfiguration.put("deviceMsgRateLimit", profile.getTransportDeviceMsgRateLimit()); - serviceConfiguration.put("deviceTelemetryMsgRateLimit", profile.getTransportDeviceTelemetryMsgRateLimit()); - serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit()); + rateLimits.put("messages", profile.getTransportDeviceMsgRateLimit()); + rateLimits.put("telemetryMessages", profile.getTransportDeviceTelemetryMsgRateLimit()); + rateLimits.put("telemetryDataPoints", profile.getTransportDeviceTelemetryDataPointsRateLimit()); } - serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize()); - serviceConfiguration.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit()); - serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType()); + sessionLimits.put("rateLimits", rateLimits); + sessionLimits.put("maxPayloadSize", context.getMaxPayloadSize()); + sessionLimits.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit()); + sessionLimits.put("payloadType", deviceSessionCtx.getPayloadType()); ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() .setRequestId(rpcRequestMsg.getRequestId()) - .setPayload(JacksonUtil.toString(serviceConfiguration)) + .setPayload(JacksonUtil.toString(sessionLimits)) .build(); onToServerRpcResponse(responseMsg);