diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java new file mode 100644 index 0000000000..6489054ba9 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import io.netty.handler.codec.mqtt.MqttQoS; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import org.thingsboard.server.transport.mqtt.limits.GatewaySessionLimits; +import org.thingsboard.server.transport.mqtt.limits.SessionLimits; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; + +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; + +@DaoSqlTest +public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTest { + + @Value("${transport.mqtt.netty.max_payload_size}") + private Integer maxPayloadSize; + + @Value("${transport.mqtt.msg_queue_size_per_device_limit}") + private int maxInflightMessages; + + @Test + public void getSessionLimitsRpcForDeviceTest() throws Exception { + loginSysAdmin(); + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); + + profileConfiguration.setTransportDeviceMsgRateLimit("20:600"); + profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600"); + profileConfiguration.setTransportDeviceTelemetryDataPointsRateLimit("40:600"); + + doPost("/api/tenantProfile", tenantProfile); + + var expectedLimits = new SessionLimits(); + var deviceLimits = new SessionLimits.SessionRateLimits(profileConfiguration.getTransportDeviceMsgRateLimit(), + profileConfiguration.getTransportDeviceTelemetryMsgRateLimit(), + profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + expectedLimits.setRateLimits(deviceLimits); + expectedLimits.setMaxPayloadSize(maxPayloadSize); + expectedLimits.setMaxInflightMessages(maxInflightMessages); + + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Get Service Configuration") + .transportPayloadType(TransportPayloadType.JSON) + .build(); + processBeforeTest(configProperties); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1"); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes()); + + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); + + var payload = callback.getPayloadBytes(); + SessionLimits actualLimits = JacksonUtil.fromBytes(payload, SessionLimits.class); + assertEquals(expectedLimits, actualLimits); + + client.disconnect(); + } + + @Test + public void getSessionLimitsRpcForGatewayTest() throws Exception { + loginSysAdmin(); + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); + + profileConfiguration.setTransportGatewayMsgRateLimit("100:600"); + profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600"); + profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit("200:600"); + + profileConfiguration.setTransportGatewayDeviceMsgRateLimit("20:600"); + profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit("10:600"); + profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit("40:600"); + + doPost("/api/tenantProfile", tenantProfile); + + var expectedLimits = new GatewaySessionLimits(); + var gatewayLimits = new SessionLimits.SessionRateLimits(profileConfiguration.getTransportGatewayMsgRateLimit(), + profileConfiguration.getTransportGatewayTelemetryMsgRateLimit(), + profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); + var gatewayDeviceLimits = new SessionLimits.SessionRateLimits(profileConfiguration.getTransportGatewayDeviceMsgRateLimit(), + profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit(), + profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + expectedLimits.setGatewayRateLimits(gatewayLimits); + expectedLimits.setRateLimits(gatewayDeviceLimits); + expectedLimits.setMaxPayloadSize(maxPayloadSize); + expectedLimits.setMaxInflightMessages(maxInflightMessages); + + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .gatewayName("Test Get Service Configuration Gateway") + .transportPayloadType(TransportPayloadType.JSON) + .build(); + processBeforeTest(configProperties); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1"); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes()); + + assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + + var payload = callback.getPayloadBytes(); + SessionLimits actualLimits = JacksonUtil.fromBytes(payload, GatewaySessionLimits.class); + assertEquals(expectedLimits, actualLimits); + + client.disconnect(); + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 16d72ff89f..b8fd79be4b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportContext; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; @@ -51,6 +52,10 @@ public class MqttTransportContext extends TransportContext { @Autowired private ProtoMqttAdaptor protoMqttAdaptor; + @Getter + @Autowired + private TransportTenantProfileCache tenantProfileCache; + @Getter @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index eeeb287dc8..4212386410 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -60,6 +60,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; @@ -78,6 +79,8 @@ import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; +import org.thingsboard.server.transport.mqtt.limits.GatewaySessionLimits; +import org.thingsboard.server.transport.mqtt.limits.SessionLimits; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; @@ -130,6 +133,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); + private static final String SESSION_LIMITS = "getSessionLimits"; private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; @@ -493,8 +497,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); toServerRpcSubTopicType = TopicType.V1; + if (SESSION_LIMITS.equals(rpcRequestMsg.getMethodName())) { + onGetSessionLimitsRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); + } else { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + } } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); @@ -928,7 +936,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } else { log.debug("[{}] Failed to process unsubscription [{}] to [{}] - Subscription not found", sessionId, mqttMsg.variableHeader().messageId(), topicName); - unSubResults.add((short)MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue()); + unSubResults.add((short) MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue()); } } if (!activityReported) { @@ -1330,6 +1338,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { + var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); + DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); + + SessionLimits sessionLimits; + + if (sessionInfo.getIsGateway()) { + var gatewaySessionLimits = new GatewaySessionLimits(); + var gatewayLimits = new SessionLimits.SessionRateLimits(profile.getTransportGatewayMsgRateLimit(), + profile.getTransportGatewayTelemetryMsgRateLimit(), + profile.getTransportGatewayTelemetryDataPointsRateLimit()); + var gatewayDeviceLimits = new SessionLimits.SessionRateLimits(profile.getTransportGatewayDeviceMsgRateLimit(), + profile.getTransportGatewayDeviceTelemetryMsgRateLimit(), + profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + gatewaySessionLimits.setGatewayRateLimits(gatewayLimits); + gatewaySessionLimits.setRateLimits(gatewayDeviceLimits); + sessionLimits = gatewaySessionLimits; + } else { + var rateLimits = new SessionLimits.SessionRateLimits(profile.getTransportDeviceMsgRateLimit(), + profile.getTransportDeviceTelemetryMsgRateLimit(), + profile.getTransportDeviceTelemetryDataPointsRateLimit()); + sessionLimits = new SessionLimits(); + sessionLimits.setRateLimits(rateLimits); + } + sessionLimits.setMaxPayloadSize(context.getMaxPayloadSize()); + sessionLimits.setMaxInflightMessages(context.getMessageQueueSizePerDeviceLimit()); + + ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); + + TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() + .setRequestId(rpcRequestMsg.getRequestId()) + .setPayload(JacksonUtil.toString(sessionLimits)) + .build(); + + onToServerRpcResponse(responseMsg); + } + private void handleToSparkplugDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws ThingsboardException { SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); SparkplugRpcRequestHeader header; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java new file mode 100644 index 0000000000..6b59f6bbbd --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.limits; + +import lombok.Data; + +@Data +public class GatewaySessionLimits extends SessionLimits { + + private SessionRateLimits gatewayRateLimits; + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java new file mode 100644 index 0000000000..b607bfadfc --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.limits; + +import lombok.Data; +import org.thingsboard.server.common.data.TransportPayloadType; + +@Data +public class SessionLimits { + + private int maxPayloadSize; + private int maxInflightMessages; + private SessionRateLimits rateLimits; + + public record SessionRateLimits(String messages, String telemetryMessages, String telemetryDataPoints) {} +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 6562fa65bf..a3b254c99e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -82,6 +82,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter(); private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); + @Getter private volatile TransportPayloadType payloadType = TransportPayloadType.JSON; private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor; private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 4c30609e18..1558bbc08b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -40,6 +41,8 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @Getter private volatile DeviceId deviceId; @Getter + private volatile TenantId tenantId; + @Getter protected volatile TransportDeviceInfo deviceInfo; @Getter @Setter @@ -58,6 +61,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public void setDeviceInfo(TransportDeviceInfo deviceInfo) { this.deviceInfo = deviceInfo; this.deviceId = deviceInfo.getDeviceId(); + this.tenantId = deviceInfo.getTenantId(); } @Override @@ -65,7 +69,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext { this.sessionInfo = sessionInfo; this.deviceProfile = deviceProfile; this.deviceInfo.setDeviceType(deviceProfile.getName()); - } @Override