diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java index a586e4f6fa..1f5430a0bb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java @@ -30,13 +30,13 @@ import java.util.Map; public class TbPubSubSubscriptionSettings { @Value("${queue.pubsub.queue-properties.core:}") private String coreProperties; - @Value("${queue.pubsub.queue-properties.rule-engine}") + @Value("${queue.pubsub.queue-properties.rule-engine:}") private String ruleEngineProperties; - @Value("${queue.pubsub.queue-properties.transport-api}") + @Value("${queue.pubsub.queue-properties.transport-api:}") private String transportApiProperties; - @Value("${queue.pubsub.queue-properties.notifications}") + @Value("${queue.pubsub.queue-properties.notifications:}") private String notificationsProperties; - @Value("${queue.pubsub.queue-properties.js-executor}") + @Value("${queue.pubsub.queue-properties.js-executor:}") private String jsExecutorProperties; @Value("${queue.pubsub.queue-properties.version-control:}") private String vcProperties; diff --git a/docker/.env b/docker/.env index f33ed50da5..27653a19ad 100644 --- a/docker/.env +++ b/docker/.env @@ -1,4 +1,4 @@ -TB_QUEUE_TYPE=kafka +TB_QUEUE_TYPE=pubsub # redis or redis-cluster CACHE=redis diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index ffd2e1f50a..f61ae550d0 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -107,14 +107,21 @@ public class ContainerTestSuite { case "aws-sqs": replaceInFile(targetDir, "queue-aws-sqs.env", Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"), - "YOUR_SECRET", "blackBoxTests.awsSecret", - "YOUR_REGION", "blackBoxTests.awsRegion")); + "YOUR_SECRET", getSysProp("blackBoxTests.awsSecret"), + "YOUR_REGION", getSysProp("blackBoxTests.awsRegion"))); break; case "rabbitmq": composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml")); replaceInFile(targetDir, "queue-rabbitmq.env", Map.of("localhost", "rabbitmq")); break; + case "service-bus": + replaceInFile(targetDir, "queue-service-bus.env", + Map.of("YOUR_NAMESPACE_NAME", getSysProp("blackBoxTests.serviceBusNamespace"), + "YOUR_SAS_KEY_NAME", getSysProp("blackBoxTests.serviceBusSASPolicy"))); + replaceInFile(targetDir, "queue-service-bus.env", + Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey"))); + break; default: throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 547ebaf575..0f75775808 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -74,8 +74,13 @@ public class WsClient extends WebSocketClient { public WsTelemetryResponse getLastMessage() { try { - latch.await(10, TimeUnit.SECONDS); - return this.message; + boolean result = latch.await(120, TimeUnit.SECONDS); + if (result) { + return this.message; + } else { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException("Timeout, ws message wasn't received"); + } } catch (InterruptedException e) { log.error("Timeout, ws message wasn't received"); } @@ -84,7 +89,11 @@ public class WsClient extends WebSocketClient { void waitForFirstReply() { try { - firstReply.await(10, TimeUnit.SECONDS); + boolean result = firstReply.await(10, TimeUnit.SECONDS); + if (!result) { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException("Timeout, ws message wasn't received"); + } } catch (InterruptedException e) { log.error("Timeout, ws message wasn't received"); throw new RuntimeException(e); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 336d75bb45..97e1836d06 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -277,7 +277,7 @@ public class MqttClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); @@ -320,8 +320,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(1); - MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(10); + MqttEvent responseFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Assert.assertEquals(requestId, responseId); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index ec77cbec36..d66db480ef 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -410,6 +410,8 @@ public class MqttGatewayClientTest extends AbstractContainerTest { String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); + TimeUnit.SECONDS.sleep(60); + List relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); Assert.assertEquals(1, relations.size()); diff --git a/msa/pom.xml b/msa/pom.xml index dd40348bc8..d32fceca68 100644 --- a/msa/pom.xml +++ b/msa/pom.xml @@ -46,6 +46,7 @@ web-ui tb-node transport + black-box-tests