Browse Source

MqttClientConfig - ownerId added for exceptions logging purposes. MqttChannelHandler - improved logging

pull/8996/head
Sergey Matvienko 3 years ago
parent
commit
ed6614af71
  1. 5
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
  2. 5
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java
  3. 10
      netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java
  4. 6
      netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java
  5. 1
      netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java
  6. 6
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java

5
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java

@ -476,8 +476,13 @@ public class MqttClientTest extends AbstractContainerTest {
return getMqttClient(deviceCredentials.getCredentialsId(), listener);
}
private String getOwnerId() {
return "Tenant[" + device.getTenantId().getId() + "]MqttClientTestDevice[" + device.getId().getId() + "]";
}
private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(username);
MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);

5
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java

@ -418,8 +418,13 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
return testRestClient.getDeviceById(createdDeviceId);
}
private String getOwnerId() {
return "Tenant[" + gatewayDevice.getTenantId().getId() + "]MqttGatewayClientTestDevice[" + gatewayDevice.getId().getId() + "]";
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId());
MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);

10
netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java

@ -320,14 +320,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
try {
if (cause instanceof IOException) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] IOException: ", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(),
cause);
} else if (log.isInfoEnabled()) {
log.info("[{}][{}][{}] IOException: {}", client.getClientConfig().getClientId(), client.getClientConfig().getUsername() , ctx.channel().remoteAddress(),
cause.getMessage());
log.debug("[{}] IOException: ", client.getClientConfig().getOwnerId(), cause);
} else {
log.info("[{}] IOException: {}", client.getClientConfig().getOwnerId(), cause.getMessage());
}
} else {
log.warn("exceptionCaught", cause);
log.warn("[{}] exceptionCaught", client.getClientConfig().getOwnerId(), cause);
}
} finally {
ReferenceCountUtil.release(cause);

6
netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientConfig.java

@ -19,6 +19,8 @@ import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import lombok.Getter;
import lombok.Setter;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -26,10 +28,12 @@ import java.util.Random;
@SuppressWarnings({"WeakerAccess", "unused"})
public final class MqttClientConfig {
private final SslContext sslContext;
private final String randomClientId;
@Getter
@Setter
private String ownerId; // [TenantId][IntegrationId] or [TenantId][RuleNodeId] for exceptions logging purposes
private String clientId;
private int timeoutSeconds = 60;
private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;

1
netty-mqtt/src/test/java/org/thingsboard/mqtt/integration/MqttIntegrationTest.java

@ -122,6 +122,7 @@ public class MqttIntegrationTest {
private MqttClient initClient() throws Exception {
MqttClientConfig config = new MqttClientConfig();
config.setOwnerId("MqttIntegrationTest");
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
MqttClient client = MqttClient.create(config, null, handlerExecutor);

6
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java

@ -25,7 +25,6 @@ import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
@ -105,8 +104,13 @@ public class TbMqttNode extends TbAbstractExternalNode {
}
}
String getOwnerId(TbContext ctx) {
return "Tenant[" + ctx.getTenantId().getId() + "]RuleNode[" + ctx.getSelf().getId().getId() + "]";
}
protected MqttClient initClient(TbContext ctx) throws Exception {
MqttClientConfig config = new MqttClientConfig(getSslContext());
config.setOwnerId(getOwnerId(ctx));
if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ?
this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());

Loading…
Cancel
Save