From 8ecde928dcb99c98348b22caa928e35e17fbc1bf Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Wed, 25 Jan 2023 16:25:25 +0200 Subject: [PATCH] sparkplug: Test Telemetry comment2 --- .../controller/AbstractControllerTest.java | 5 - .../server/controller/AbstractWebTest.java | 4 + .../AbstractMqttV5ClientSparkplugTest.java | 23 +- ...ctMqttV5ClientSparkplugConnectionTest.java | 16 +- ...actMqttV5ClientSparkplugTelemetryTest.java | 368 ++++++++++++------ .../MqttV5ClientSparkplugBTelemetryTest.java | 10 +- .../util/sparkplug/SparkplugMetricUtil.java | 215 +++++----- 7 files changed, 377 insertions(+), 264 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index f3e632b4b4..ed8d8632da 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootContextLoader; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.web.server.LocalServerPort; @@ -30,7 +29,6 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.thingsboard.server.dao.timeseries.TimeseriesService; import java.net.URI; import java.net.URISyntaxException; @@ -51,9 +49,6 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest { public static final String WS_URL = "ws://localhost:"; - @Autowired - public TimeseriesService tsService; - @LocalServerPort protected int wsPort; diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 9ea269918e..3d7dc62cba 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -83,6 +83,7 @@ import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.config.ThingsboardSecurityConfiguration; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.tenant.TenantProfileService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.service.mail.TestMailService; import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest; import org.thingsboard.server.service.security.auth.rest.LoginRequest; @@ -167,6 +168,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { @Autowired private TenantProfileService tenantProfileService; + @Autowired + public TimeseriesService tsService; + @Rule public TestRule watcher = new TestWatcher() { protected void starting(Description description) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index 13f364ff72..879629005c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -80,35 +80,40 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode()); } - protected SparkplugBProto.Payload.Metric createMetric(TsKvEntry tsKvEntry, MetricDataType metricDataType) throws ThingsboardException { + protected SparkplugBProto.Payload.Metric createMetric(Object value, TsKvEntry tsKvEntry, MetricDataType metricDataType) throws ThingsboardException { SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder() .setTimestamp(tsKvEntry.getTs()) .setName(tsKvEntry.getKey()) .setDatatype(metricDataType.toIntValue()) .build(); - Object value = tsKvEntry.getValue(); switch (metricDataType) { case Int8: case Int16: - case Int32: case UInt8: case UInt16: - return metric.toBuilder().setIntValue(Integer.parseInt(String.valueOf(value))).build(); + int valueMetric = Integer.valueOf(String.valueOf(value)); + return metric.toBuilder().setIntValue(valueMetric).build(); + case Int32: case UInt32: + if (value instanceof Long) { + return metric.toBuilder().setLongValue((long) value).build(); + } else { + return metric.toBuilder().setIntValue((int)value).build(); + } case Int64: case UInt64: case DateTime: - return metric.toBuilder().setLongValue(Long.parseLong(String.valueOf(value))).build(); + return metric.toBuilder().setLongValue((long) value).build(); case Float: - return metric.toBuilder().setFloatValue(Float.parseFloat(String.valueOf(value))).build(); + return metric.toBuilder().setFloatValue((float) value).build(); case Double: - return metric.toBuilder().setDoubleValue(Double.parseDouble(String.valueOf(value))).build(); + return metric.toBuilder().setDoubleValue((double) value).build(); case Boolean: - return metric.toBuilder().setBooleanValue(Boolean.parseBoolean(String.valueOf(value))).build(); + return metric.toBuilder().setBooleanValue((boolean) value).build(); case String: case Text: case UUID: - return metric.toBuilder().setStringValue(String.valueOf(value)).build(); + return metric.toBuilder().setStringValue((String) value).build(); case DataSet: return metric.toBuilder().setDatasetValue((SparkplugBProto.Payload.DataSet) value).build(); case Bytes: diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index 594acc3aec..c0f9bf6797 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -63,7 +63,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()); - deathPayload.addMetrics(createMetric(tsKvEntryBdSecOriginal, metricDataType)); + deathPayload.addMetrics(createMetric(value, tsKvEntryBdSecOriginal, metricDataType)); MqttWireMessage response = clientWithCorrectNodeAccessTokenWithNDEATH(deathPayload.build().toByteArray()); @@ -94,7 +94,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr String keys = "Device Metric int32"; int valueDeviceInt32 = 1024; TsKvEntry expectedTsKvEntryDeviceInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueDeviceInt32))); - SparkplugBProto.Payload.Metric metric = createMetric(expectedTsKvEntryDeviceInt32, metricDataType); + SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, expectedTsKvEntryDeviceInt32, metricDataType); for (int i=0; i < cntDevices; i++ ) { SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()) @@ -119,17 +119,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); return device.get() != null; }); - Assert.assertEquals(deviceName, device.get().getName()); - AtomicReference>> finalFuture = new AtomicReference<>(); - await(alias + SparkplugMessageType.DBIRTH.name()) - .atMost(40, TimeUnit.SECONDS) - .until(() -> { - finalFuture.set(tsService.findLatest(tenantId, device.get().getId(), keys)); - return finalFuture.get().get().isPresent(); - }); - TsKvEntry actualTsKvEntry = finalFuture.get().get().get(); - Assert.assertEquals(expectedTsKvEntryDeviceInt32, actualTsKvEntry); - } + } } private MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH(byte[] deathBytes) throws Exception { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index 8bc8593f05..82bf6d52d6 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -15,11 +15,17 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug.timeseries; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; +import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -28,10 +34,11 @@ import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSpark import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; -import java.math.BigInteger; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; -import java.util.Random; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -49,198 +56,305 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp * Created by nickAS21 on 12.01.23 */ @Slf4j -public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends AbstractMqttV5ClientSparkplugTest { +public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends AbstractMqttV5ClientSparkplugTest { - protected void processClientWithCorrectAccessTokenPushDeviceMetricBuildSimple() throws Exception { + protected ThreadLocalRandom random = ThreadLocalRandom.current(); + + protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { + processClientWithCorrectNodeAccess(); + List listKeys = new ArrayList<>(); + SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() + .setTimestamp(calendar.getTimeInMillis()); + long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; + long valueBdSec = getBdSeqNum(); + MetricDataType metricDataType = Int64; + TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec)); + payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType)); + listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); + + String keys = "Node Control/Rebirth"; + boolean valueRebirth = false; + metricDataType = MetricDataType.Boolean; + TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth)); + payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType)); + listKeys.add(keys); + + keys = "Node Metric int32"; + int valueNodeInt32 = 1024; + metricDataType = Int32; + TsKvEntry expectedSsKvEntryNodeInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueNodeInt32))); + payloadBirthNode.addMetrics(createMetric(valueNodeInt32, expectedSsKvEntryNodeInt32, metricDataType)); + listKeys.add(keys); + + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, + payloadBirthNode.build().toByteArray(), 0, false); + + AtomicReference>> finalFuture = new AtomicReference<>(); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys)); + return !finalFuture.get().get().isEmpty(); + }); + Assert.assertEquals(listKeys.size(), finalFuture.get().get().size()); + } + + protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { + processClientWithCorrectNodeAccess(); + SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() + .setTimestamp(calendar.getTimeInMillis()); + List listKeys = new ArrayList<>(); + long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; + long valueBdSec = getBdSeqNum(); + MetricDataType metricDataType = Int64; + TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec)); + payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType)); + listKeys.add(SparkplugMessageType.NCMD.name() + " " + keysBdSeq); + + String keys = "Node Control/Rebirth"; + boolean valueRebirth = true; + metricDataType = MetricDataType.Boolean; + TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth)); + payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType)); + listKeys.add(keys); + + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NCMD.name() + "/" + edgeNode, + payloadBirthNode.build().toByteArray(), 0, false); + + AtomicReference>> finalFuture = new AtomicReference<>(); + await(alias + SparkplugMessageType.NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys)); + return !finalFuture.get().get().isEmpty(); + }); + Assert.assertEquals(listKeys.size(), finalFuture.get().get().size()); + } + + protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { processClientWithCorrectNodeAccess(); - Random random = new Random(); + String deviceName = deviceId + "_" + 10; - String messageTypeName = SparkplugMessageType.DDATA.name(); + String messageTypeName = SparkplugMessageType.NDATA.name(); List listKeys = new ArrayList<>(); + List listTsKvEntry = new ArrayList<>(); - SparkplugBProto.Payload.Builder ddataPayload = SparkplugBProto.Payload.newBuilder() + SparkplugBProto.Payload.Builder ndataPayload = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()) .setSeq(getSeqNum()); - long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS; + long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; + + createdAddMetricTsKv(listTsKvEntry, listKeys, ndataPayload, ts); + + if (client.isConnected()) { + client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, + ndataPayload.build().toByteArray(), 0, false); + } + + AtomicReference>> finalFuture = new AtomicReference<>(); + await(alias + SparkplugMessageType.NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); + return finalFuture.get().get().size() == listTsKvEntry.size(); + }); + Assert.assertTrue("Expected tsKvEntrys is not equal Actual tsKvEntrys", listTsKvEntry.containsAll(finalFuture.get().get())); + Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); + } + + private void createdAddMetricTsKv(List listTsKvEntry, List listKeys, + SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { String keys = "MyInt8"; - MetricDataType metricDataType = Int8; - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)((byte)random.nextInt()))); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt8(), ts, Int8)); listKeys.add(keys); keys = "MyInt16"; - metricDataType = Int16; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)((short)random.nextInt()))); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt16(), ts, Int16)); listKeys.add(keys); keys = "MyInt32"; - metricDataType = Int32; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)(random.nextInt()))); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt32(), ts, Int32)); listKeys.add(keys); keys = "MyInt64"; - metricDataType = UInt64; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, random.nextLong())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt64(), ts, Int64)); listKeys.add(keys); keys = "MyUInt8"; - metricDataType = UInt8; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)((short)random.nextInt()))); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt8(), ts, UInt8)); listKeys.add(keys); keys = "MyUInt16"; - metricDataType = UInt16; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)random.nextInt())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt16(), ts, UInt16)); listKeys.add(keys); - keys = "MyUInt32"; - metricDataType = UInt32; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, random.nextLong())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + keys = "MyUInt32I"; + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt32I(), ts, UInt32)); + listKeys.add(keys); + + keys = "MyUInt32L"; + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt32L(), ts, UInt32)); listKeys.add(keys); keys = "MyUInt64"; - metricDataType = UInt64; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, BigInteger.valueOf(random.nextLong()).longValue())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt64(), ts, UInt64)); listKeys.add(keys); keys = "MyFloat"; - metricDataType = MetricDataType.Float; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)random.nextFloat())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvFloat(dataPayload, keys, nextFloat(0, 100), ts, MetricDataType.Float)); listKeys.add(keys); keys = "MyDateTime"; - metricDataType = MetricDataType.DateTime; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, ts)); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, ts, ts, MetricDataType.DateTime)); listKeys.add(keys); keys = "MyDouble"; - metricDataType = MetricDataType.Double; - tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long) random.nextDouble())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvDouble(dataPayload, keys, nextDouble(), ts, MetricDataType.Double)); listKeys.add(keys); keys = "MyBoolean"; - metricDataType = MetricDataType.Boolean; - tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, random.nextBoolean())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvBoolean(dataPayload, keys, nextBoolean(), ts, MetricDataType.Boolean)); listKeys.add(keys); keys = "MyString"; - metricDataType = MetricDataType.String; - tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, newUUID())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nexString(), ts, MetricDataType.String)); listKeys.add(keys); keys = "MyText"; - metricDataType = MetricDataType.Text; - tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, newUUID())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nexString(), ts, MetricDataType.Text)); listKeys.add(keys); keys = "MyUUID"; - metricDataType = MetricDataType.Text; - tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, newUUID())); - ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType)); + listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nexString(), ts, MetricDataType.UUID)); listKeys.add(keys); - if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode + "/" + deviceName, - ddataPayload.build().toByteArray(), 0, false); - } + } - AtomicReference>> finalFuture = new AtomicReference<>(); - await(alias + SparkplugMessageType.NCMD.name()) - .atMost(40, TimeUnit.SECONDS) - .until(() -> { - finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys)); - return !finalFuture.get().get().isEmpty(); - }); - Assert.assertEquals(listKeys.size(), finalFuture.get().get().size()); + private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value, + long ts, MetricDataType metricDataType) throws ThingsboardException { + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value)))); + + dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + return tsKvEntry; } - protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { - processClientWithCorrectNodeAccess(); - List listKeys = new ArrayList<>(); - SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() - .setTimestamp(calendar.getTimeInMillis()); - long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS; - long valueBdSec = getBdSeqNum(); - MetricDataType metricDataType = Int64; - TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec)); - payloadBirthNode.addMetrics(createMetric(tsKvEntryBdSecOriginal, metricDataType)); - listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); + private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value, + long ts, MetricDataType metricDataType) throws ThingsboardException { + var f = new BigDecimal(String.valueOf(value)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(keys, f.doubleValue())); + dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + return tsKvEntry; + } - String keys = "Node Control/Rebirth"; - boolean valueRebirth = false; - metricDataType = MetricDataType.Boolean; - TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth)); - payloadBirthNode.addMetrics(createMetric(expectedSsKvEntryRebirth , metricDataType)); - listKeys.add(keys); + private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String keys, double value, + long ts, MetricDataType metricDataType) throws ThingsboardException { + var d = new BigDecimal(String.valueOf(value)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, d.longValueExact())); + dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + return tsKvEntry; + } - keys = "Node Metric int32"; - int valueNodeInt32 = 1024; - metricDataType = MetricDataType.Boolean; - TsKvEntry expectedSsKvEntryNodeInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueNodeInt32))); - payloadBirthNode.addMetrics(createMetric(expectedSsKvEntryNodeInt32 , metricDataType)); - listKeys.add(keys); + private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String keys, boolean value, + long ts, MetricDataType metricDataType) throws ThingsboardException { + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, value)); + dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + return tsKvEntry; + } - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, - payloadBirthNode.build().toByteArray(), 0, false); + private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String keys, String value, + long ts, MetricDataType metricDataType) throws ThingsboardException { + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, value)); + dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + return tsKvEntry; + } - AtomicReference>> finalFuture = new AtomicReference<>(); - await(alias + SparkplugMessageType.NBIRTH.name()) - .atMost(40, TimeUnit.SECONDS) - .until(() -> { - finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys)); - return !finalFuture.get().get().isEmpty(); - }); - Assert.assertEquals(listKeys.size(), finalFuture.get().get().size()); + private TsKvEntry createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys, String value, + long ts, MetricDataType metricDataType) throws ThingsboardException { + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(keys, value)); + dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + return tsKvEntry; } - protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { - processClientWithCorrectNodeAccess(); - SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() - .setTimestamp(calendar.getTimeInMillis()); - List listKeys = new ArrayList<>(); - long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS; - long valueBdSec = getBdSeqNum(); - MetricDataType metricDataType = Int64; - TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec)); - payloadBirthNode.addMetrics(createMetric(tsKvEntryBdSecOriginal, metricDataType)); - listKeys.add(SparkplugMessageType.NCMD.name() + " " + keysBdSeq); + private byte nextInt8() { + return (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE); + } - String keys = "Node Control/Rebirth"; - boolean valueRebirth = true; - metricDataType = MetricDataType.Boolean; - TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth)); - payloadBirthNode.addMetrics(createMetric(expectedSsKvEntryRebirth , metricDataType)); - listKeys.add(keys); + private short nextUInt8() { + return (short) random.nextInt(0, Byte.MAX_VALUE * 2 + 1); + } - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NCMD.name() + "/" + edgeNode, - payloadBirthNode.build().toByteArray(), 0, false); + private short nextInt16() { + return (short) random.nextInt(Short.MIN_VALUE, Short.MAX_VALUE); + } - AtomicReference>> finalFuture = new AtomicReference<>(); - await(alias + SparkplugMessageType.NCMD.name()) - .atMost(40, TimeUnit.SECONDS) - .until(() -> { - finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys)); - return !finalFuture.get().get().isEmpty(); - }); - Assert.assertEquals(listKeys.size(), finalFuture.get().get().size()); + private short nextUInt16() { + return (short) random.nextInt(0, Short.MAX_VALUE * 2 + 1); + } + + private int nextInt32() { + return random.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE); + } + + private int nextUInt32I() { + return random.nextInt(0, Integer.MAX_VALUE); + } + + private long nextUInt32L() { + long l = Integer.MAX_VALUE; + return random.nextLong(0, l * 2 + 1); + } + + private long nextInt64() { + return random.nextLong(Long.MIN_VALUE, Long.MAX_VALUE); + } + + private long nextUInt64() { + double d = Long.MAX_VALUE; + return random.nextLong(0, (long) (d * 2 + 1)); } - private String newUUID() { + private double nextDouble() { + return random.nextDouble(Long.MIN_VALUE, Long.MAX_VALUE); + } + + private float nextFloat(float min, float max) { + if (min >= max) + throw new IllegalArgumentException("max must be greater than min"); + float result = ThreadLocalRandom.current().nextFloat() * (max - min) + min; + if (result >= max) // correct for rounding + result = Float.intBitsToFloat(Float.floatToIntBits(max) - 1); + return result; + } + + + private boolean nextBoolean() { + return random.nextBoolean(); + } + + private String nexString() { return java.util.UUID.randomUUID().toString(); } + private List getActualKeysList(DeviceId deviceId, List expectedKeys, long start) throws Exception { +// long start = System.currentTimeMillis(); + long end = System.currentTimeMillis() + 3000; + + List actualKeys = null; + while (start <= end) { + actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", new TypeReference<>() { + }); + + Map> timeseries = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + + savedGateway.getId().getId() + "/values/timeseries?keys=" + expectedKeys.get(6), new TypeReference<>() { + }); + if (actualKeys.size() == expectedKeys.size()) { + break; + } + Thread.sleep(300); + start += 100; + } + return actualKeys; + } + } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java index 678ce69066..63aec2dba2 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java @@ -38,11 +38,6 @@ public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSpa client.disconnect(); } } - @Test - public void testClientWithCorrectAccessTokenPushDeviceMetricBuildSimple() throws Exception { - processClientWithCorrectAccessTokenPushDeviceMetricBuildSimple(); - } - @Test public void testClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { processClientWithCorrectAccessTokenPublishNBIRTH(); @@ -53,4 +48,9 @@ public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSpa processClientWithCorrectAccessTokenPublishNCMDReBirth(); } + @Test + public void testClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { + processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple(); + } + } \ No newline at end of file diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index 53d173b7fb..e03f2a0664 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; @@ -50,116 +51,120 @@ public class SparkplugMetricUtil { TransportProtos.KeyValueProto.Builder builderProto = TransportProtos.KeyValueProto.newBuilder(); ArrayNode nodeArray = newArrayNode(); MetricDataType metricDataType = MetricDataType.fromInteger(metricType); - switch (metricDataType) { - case Boolean: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V) - .setBoolV(protoMetric.getBooleanValue()).build()); - case DateTime: - case Int64: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) - .setLongV(protoMetric.getLongValue()).build()); - case Float: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) - .setLongV((long) protoMetric.getFloatValue()).build()); - case Double: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) - .setDoubleV(protoMetric.getDoubleValue()).build()); - case Int8: - case UInt8: - case Int16: - case Int32: - case UInt16: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) - .setLongV(protoMetric.getIntValue()).build()); - case UInt32: - case UInt64: - if (protoMetric.hasIntValue()) { - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) - .setLongV(protoMetric.getIntValue()).build()); - } else if (protoMetric.hasLongValue()) { + try { + switch (metricDataType) { + case Boolean: + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V) + .setBoolV(protoMetric.getBooleanValue()).build()); + case DateTime: + case Int64: return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) .setLongV(protoMetric.getLongValue()).build()); - } else { - log.error("Invalid value for UInt32 datatype"); - throw new ThingsboardException("Invalid value for " + MetricDataType.fromInteger(metricType).name() + " datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); - } - case String: - case Text: - case UUID: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(protoMetric.getStringValue()).build()); - case Bytes: - case BooleanArray: - case Int8Array: - case Int16Array: - case Int32Array: - case UInt8Array: - case UInt16Array: - case FloatArray: - case DoubleArray: - case DateTimeArray: - case Int64Array: - case UInt64Array: - case UInt32Array: - ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) { - byteBuffer.order(ByteOrder.LITTLE_ENDIAN); - } - while (byteBuffer.hasRemaining()) { - setValueToNodeArray(nodeArray, byteBuffer, metricType); - } - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) - .setJsonV(nodeArray.toString()).build()); - case StringArray: - ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN); - StringBuilder sb = new StringBuilder(); - while (stringByteBuffer.hasRemaining()) { - byte b = stringByteBuffer.get(); - if (b == (byte) 0) { - nodeArray.add(sb.toString()); - sb = new StringBuilder(); + case Float: + var f = new BigDecimal(String.valueOf(protoMetric.getFloatValue())); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) + .setDoubleV(f.doubleValue()).build()); + case Double: + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) + .setDoubleV(protoMetric.getDoubleValue()).build()); + case Int8: + case UInt8: + case Int16: + case Int32: + case UInt16: + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + .setLongV(protoMetric.getIntValue()).build()); + case UInt32: + case UInt64: + if (protoMetric.hasIntValue()) { + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + .setLongV(protoMetric.getIntValue()).build()); + } else if (protoMetric.hasLongValue()) { + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + .setLongV(protoMetric.getLongValue()).build()); } else { - sb.append((char) b); + log.error("Invalid value for UInt32 datatype"); + throw new ThingsboardException("Invalid value for " + MetricDataType.fromInteger(metricType).name() + " datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); } - } - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) - .setJsonV(nodeArray.toString()).build()); - case DataSet: - case Template: - case File: - //TODO - // Build the and create the DataSet - /** - SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); - return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) - .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) - .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) - .createDataSet(); - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(protoDataSet.toString()).build()); - **/ - //TODO - // Build the and create the Template - /** - SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue(); - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV( protoTemplate.toString()).build()); - **/ - //TODO - // Build the and create the File - /** - String filename = protoMetric.getMetadata().getFileName(); - return Optional.of(builderPrbyteValueoto.setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build()); - **/ - return Optional.empty(); - case Unknown: - default: - throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); + case String: + case Text: + case UUID: + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV(protoMetric.getStringValue()).build()); + case Bytes: + case BooleanArray: + case Int8Array: + case Int16Array: + case Int32Array: + case UInt8Array: + case UInt16Array: + case FloatArray: + case DoubleArray: + case DateTimeArray: + case Int64Array: + case UInt64Array: + case UInt32Array: + ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); + if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) { + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + } + while (byteBuffer.hasRemaining()) { + setValueToNodeArray(nodeArray, byteBuffer, metricType); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + case StringArray: + ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); + stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN); + StringBuilder sb = new StringBuilder(); + while (stringByteBuffer.hasRemaining()) { + byte b = stringByteBuffer.get(); + if (b == (byte) 0) { + nodeArray.add(sb.toString()); + sb = new StringBuilder(); + } else { + sb.append((char) b); + } + } + return Optional.of(builderProto.setKey(key) + .setJsonV(nodeArray.toString()).build()); + case DataSet: + case Template: + case File: + //TODO + // Build the and create the DataSet + /** + SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); + return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) + .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) + .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) + .createDataSet(); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV(protoDataSet.toString()).build()); + **/ + //TODO + // Build the and create the Template + /** + SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue(); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV( protoTemplate.toString()).build()); + **/ + //TODO + // Build the and create the File + /** + String filename = protoMetric.getMetadata().getFileName(); + return Optional.of(builderPrbyteValueoto.setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build()); + **/ + return Optional.empty(); + case Unknown: + default: + throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); + } + } catch (Exception e){ + log.error("", e); + return Optional.empty(); } - - } private static void setValueToNodeArray(ArrayNode nodeArray, ByteBuffer byteBuffer, int metricType) {