diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 940bd6777e..5a7e9b631d 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -476,8 +476,13 @@ public class MqttClientTest extends AbstractContainerTest { return getMqttClient(deviceCredentials.getCredentialsId(), listener); } + private String getOwnerId() { + return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]"; + } + private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index de11df2623..5e2a9f5fe6 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -418,8 +418,13 @@ public class MqttGatewayClientTest extends AbstractContainerTest { return testRestClient.getDeviceById(createdDeviceId); } + private String getOwnerId() { + return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]"; + } + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); + clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(deviceCredentials.getCredentialsId()); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index 6b3a4e009e..a40c30ef96 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -320,14 +320,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler try { if (cause instanceof IOException) { if (log.isDebugEnabled()) { - log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause); - } else if (log.isInfoEnabled()) { - log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(), - cause.getMessage()); + log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause); + } else { + log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage()); } } else { - log.warn("exceptionCaught", cause); + log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause); } } finally { ReferenceCountUtil.release(cause); diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java index 1e20a842c3..10cee5d5fc 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslContext; +import lombok.Getter; +import lombok.Setter; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -26,10 +28,12 @@ import java.util.Random; @SuppressWarnings({"WeakerAccess", "unused"}) public final class MqttClientConfig { - private final SslContext sslContext; private final String randomClientId; + @Getter + @Setter + private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes private String clientId; private int timeoutSeconds = 60; private MqttVersion protocolVersion = MqttVersion.MQTT_3_1; diff --git a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java index f39ca01110..04c2be1740 100644 --- a/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java +++ b/netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java @@ -122,6 +122,7 @@ public class MqttIntegrationTest { private MqttClient initClient() throws Exception { MqttClientConfig config = new MqttClientConfig(); + config.setOwnerId("MqttIntegrationTest"); config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); config.setReconnectDelay(RECONNECT_DELAY_SECONDS); MqttClient client = MqttClient.create(config, null, handlerExecutor); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 49b31cb33c..1376f29710 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttConnectResult; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode { } } + String getOwnerId(TbContext ctx) { + return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]"; + } + protected MqttClient initClient(TbContext ctx) throws Exception { MqttClientConfig config = new MqttClientConfig(getSslContext()); + config.setOwnerId(getOwnerId(ctx)); if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) { config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ? this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());