diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java index 08e5cfe6d3..05075ba9de 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java +++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java @@ -22,7 +22,12 @@ import org.eclipse.paho.client.mqttv3.IMqttToken; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceCredentials; +import java.io.BufferedWriter; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -58,16 +63,20 @@ public class MqttStressTestTool { List clients = new ArrayList<>(); List connectTokens = new ArrayList<>(); + List deviceCredentialsIds = new ArrayList<>(); for (int i = 0; i < params.getDeviceCount(); i++) { Device device = restClient.createDevice("Device " + UUID.randomUUID()); DeviceCredentials credentials = restClient.getCredentials(device.getId()); String[] mqttUrls = params.getMqttUrls(); String mqttURL = mqttUrls[i % mqttUrls.length]; MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId()); + deviceCredentialsIds.add(credentials.getCredentialsId()); connectTokens.add(client.connect()); clients.add(client); } + dumpDeviceCredentialsIdsToTmpFile(deviceCredentialsIds); + for (IMqttToken tokens : connectTokens) { tokens.waitForCompletion(); } @@ -114,4 +123,20 @@ public class MqttStressTestTool { scheduler.shutdownNow(); } + private static void dumpDeviceCredentialsIdsToTmpFile(List deviceCredentialsIds) throws IOException { + Path path = Paths.get("/tmp/mqtt.csv"); + try (BufferedWriter writer = Files.newBufferedWriter(path)) { + writer.write("deviceCredentialsId"); + writer.write('\n'); + deviceCredentialsIds.forEach((deviceCredentialsId) -> { + try { + writer.write(deviceCredentialsId); + writer.write('\n'); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + } + } diff --git a/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala b/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala index 6ac17d74fa..d82ed5b7d4 100644 --- a/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala +++ b/tools/src/test/scala/org/thingsboard/client/tools/MqttSimulation.scala @@ -1,18 +1,18 @@ /** - * Copyright © 2016 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.client.tools import io.gatling.core.Predef._ @@ -22,16 +22,19 @@ import scala.concurrent.duration._ import com.github.mnogu.gatling.mqtt.Predef._ class MqttSimulation extends Simulation { + val mqttConf = mqtt .host("tcp://localhost:1883") - .userName("A1_TEST_TOKEN") + .userName("${deviceCredentialsId}") val scn = scenario("MQTT Test") + .feed(csv("/tmp/mqtt.csv").circular) .exec(mqtt("request") .publish("v1/devices/me/telemetry", "{\"key1\":\"value1\", \"key2\":\"value2\"}", QoS.AT_LEAST_ONCE, retain = false)) setUp( - scn - .inject(constantUsersPerSec(2) during(5 seconds))) - .protocols(mqttConf) + scn + .inject(constantUsersPerSec(1000) during (5 seconds)) + ).protocols(mqttConf) + } \ No newline at end of file