Browse Source

Fix TbPubSubSubscriptionSettings - added default values. Fixed WsClient - throw exception properly on latch timeout. Tuned timeouts for tests

pull/6933/head
Volodymyr Babak 4 years ago
parent
commit
69e2eef64d
  1. 8
      common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java
  2. 2
      docker/.env
  3. 11
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
  4. 15
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
  5. 6
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
  6. 2
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java
  7. 1
      msa/pom.xml

8
common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java

@ -30,13 +30,13 @@ import java.util.Map;
public class TbPubSubSubscriptionSettings {
@Value("${queue.pubsub.queue-properties.core:}")
private String coreProperties;
@Value("${queue.pubsub.queue-properties.rule-engine}")
@Value("${queue.pubsub.queue-properties.rule-engine:}")
private String ruleEngineProperties;
@Value("${queue.pubsub.queue-properties.transport-api}")
@Value("${queue.pubsub.queue-properties.transport-api:}")
private String transportApiProperties;
@Value("${queue.pubsub.queue-properties.notifications}")
@Value("${queue.pubsub.queue-properties.notifications:}")
private String notificationsProperties;
@Value("${queue.pubsub.queue-properties.js-executor}")
@Value("${queue.pubsub.queue-properties.js-executor:}")
private String jsExecutorProperties;
@Value("${queue.pubsub.queue-properties.version-control:}")
private String vcProperties;

2
docker/.env

@ -1,4 +1,4 @@
TB_QUEUE_TYPE=kafka
TB_QUEUE_TYPE=pubsub
# redis or redis-cluster
CACHE=redis

11
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java

@ -107,14 +107,21 @@ public class ContainerTestSuite {
case "aws-sqs":
replaceInFile(targetDir, "queue-aws-sqs.env",
Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"),
"YOUR_SECRET", "blackBoxTests.awsSecret",
"YOUR_REGION", "blackBoxTests.awsRegion"));
"YOUR_SECRET", getSysProp("blackBoxTests.awsSecret"),
"YOUR_REGION", getSysProp("blackBoxTests.awsRegion")));
break;
case "rabbitmq":
composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml"));
replaceInFile(targetDir, "queue-rabbitmq.env",
Map.of("localhost", "rabbitmq"));
break;
case "service-bus":
replaceInFile(targetDir, "queue-service-bus.env",
Map.of("YOUR_NAMESPACE_NAME", getSysProp("blackBoxTests.serviceBusNamespace"),
"YOUR_SAS_KEY_NAME", getSysProp("blackBoxTests.serviceBusSASPolicy")));
replaceInFile(targetDir, "queue-service-bus.env",
Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey")));
break;
default:
throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE);
}

15
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java

@ -74,8 +74,13 @@ public class WsClient extends WebSocketClient {
public WsTelemetryResponse getLastMessage() {
try {
latch.await(10, TimeUnit.SECONDS);
return this.message;
boolean result = latch.await(120, TimeUnit.SECONDS);
if (result) {
return this.message;
} else {
log.error("Timeout, ws message wasn't received");
throw new RuntimeException("Timeout, ws message wasn't received");
}
} catch (InterruptedException e) {
log.error("Timeout, ws message wasn't received");
}
@ -84,7 +89,11 @@ public class WsClient extends WebSocketClient {
void waitForFirstReply() {
try {
firstReply.await(10, TimeUnit.SECONDS);
boolean result = firstReply.await(10, TimeUnit.SECONDS);
if (!result) {
log.error("Timeout, ws message wasn't received");
throw new RuntimeException("Timeout, ws message wasn't received");
}
} catch (InterruptedException e) {
log.error("Timeout, ws message wasn't received");
throw new RuntimeException(e);

6
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java

@ -277,7 +277,7 @@ public class MqttClientTest extends AbstractContainerTest {
});
// Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS);
Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage());
@ -320,8 +320,8 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get();
// Check the response from the server
TimeUnit.SECONDS.sleep(1);
MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
MqttEvent responseFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
Assert.assertEquals(requestId, responseId);
Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText());

2
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java

@ -410,6 +410,8 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
String deviceName = "mqtt_device";
mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get();
TimeUnit.SECONDS.sleep(60);
List<EntityRelation> relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON);
Assert.assertEquals(1, relations.size());

1
msa/pom.xml

@ -46,6 +46,7 @@
<module>web-ui</module>
<module>tb-node</module>
<module>transport</module>
<module>black-box-tests</module>
</modules>
<profiles>

Loading…
Cancel
Save