Browse Source

sparkplug: Test Telemetry comment2

pull/7931/head
nickAS21 3 years ago
parent
commit
8ecde928dc
  1. 5
      application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
  2. 4
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  3. 23
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java
  4. 16
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java
  5. 368
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java
  6. 10
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java
  7. 215
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java

5
application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java

@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootContextLoader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
@ -30,7 +29,6 @@ import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.net.URI;
import java.net.URISyntaxException;
@ -51,9 +49,6 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest {
public static final String WS_URL = "ws://localhost:";
@Autowired
public TimeseriesService tsService;
@LocalServerPort
protected int wsPort;

4
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -83,6 +83,7 @@ import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.config.ThingsboardSecurityConfiguration;
import org.thingsboard.server.dao.Dao;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.mail.TestMailService;
import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest;
import org.thingsboard.server.service.security.auth.rest.LoginRequest;
@ -167,6 +168,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
@Autowired
private TenantProfileService tenantProfileService;
@Autowired
public TimeseriesService tsService;
@Rule
public TestRule watcher = new TestWatcher() {
protected void starting(Description description) {

23
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java

@ -80,35 +80,40 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode());
}
protected SparkplugBProto.Payload.Metric createMetric(TsKvEntry tsKvEntry, MetricDataType metricDataType) throws ThingsboardException {
protected SparkplugBProto.Payload.Metric createMetric(Object value, TsKvEntry tsKvEntry, MetricDataType metricDataType) throws ThingsboardException {
SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder()
.setTimestamp(tsKvEntry.getTs())
.setName(tsKvEntry.getKey())
.setDatatype(metricDataType.toIntValue())
.build();
Object value = tsKvEntry.getValue();
switch (metricDataType) {
case Int8:
case Int16:
case Int32:
case UInt8:
case UInt16:
return metric.toBuilder().setIntValue(Integer.parseInt(String.valueOf(value))).build();
int valueMetric = Integer.valueOf(String.valueOf(value));
return metric.toBuilder().setIntValue(valueMetric).build();
case Int32:
case UInt32:
if (value instanceof Long) {
return metric.toBuilder().setLongValue((long) value).build();
} else {
return metric.toBuilder().setIntValue((int)value).build();
}
case Int64:
case UInt64:
case DateTime:
return metric.toBuilder().setLongValue(Long.parseLong(String.valueOf(value))).build();
return metric.toBuilder().setLongValue((long) value).build();
case Float:
return metric.toBuilder().setFloatValue(Float.parseFloat(String.valueOf(value))).build();
return metric.toBuilder().setFloatValue((float) value).build();
case Double:
return metric.toBuilder().setDoubleValue(Double.parseDouble(String.valueOf(value))).build();
return metric.toBuilder().setDoubleValue((double) value).build();
case Boolean:
return metric.toBuilder().setBooleanValue(Boolean.parseBoolean(String.valueOf(value))).build();
return metric.toBuilder().setBooleanValue((boolean) value).build();
case String:
case Text:
case UUID:
return metric.toBuilder().setStringValue(String.valueOf(value)).build();
return metric.toBuilder().setStringValue((String) value).build();
case DataSet:
return metric.toBuilder().setDatasetValue((SparkplugBProto.Payload.DataSet) value).build();
case Bytes:

16
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java

@ -63,7 +63,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr
SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
deathPayload.addMetrics(createMetric(tsKvEntryBdSecOriginal, metricDataType));
deathPayload.addMetrics(createMetric(value, tsKvEntryBdSecOriginal, metricDataType));
MqttWireMessage response = clientWithCorrectNodeAccessTokenWithNDEATH(deathPayload.build().toByteArray());
@ -94,7 +94,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr
String keys = "Device Metric int32";
int valueDeviceInt32 = 1024;
TsKvEntry expectedTsKvEntryDeviceInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueDeviceInt32)));
SparkplugBProto.Payload.Metric metric = createMetric(expectedTsKvEntryDeviceInt32, metricDataType);
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, expectedTsKvEntryDeviceInt32, metricDataType);
for (int i=0; i < cntDevices; i++ ) {
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis())
@ -119,17 +119,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;
});
Assert.assertEquals(deviceName, device.get().getName());
AtomicReference<ListenableFuture<Optional<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.DBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findLatest(tenantId, device.get().getId(), keys));
return finalFuture.get().get().isPresent();
});
TsKvEntry actualTsKvEntry = finalFuture.get().get().get();
Assert.assertEquals(expectedTsKvEntryDeviceInt32, actualTsKvEntry);
}
}
}
private MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH(byte[] deathBytes) throws Exception {

368
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java

@ -15,11 +15,17 @@
*/
package org.thingsboard.server.transport.mqtt.sparkplug.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -28,10 +34,11 @@ import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSpark
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import java.math.BigInteger;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -49,198 +56,305 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp
* Created by nickAS21 on 12.01.23
*/
@Slf4j
public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends AbstractMqttV5ClientSparkplugTest {
public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends AbstractMqttV5ClientSparkplugTest {
protected void processClientWithCorrectAccessTokenPushDeviceMetricBuildSimple() throws Exception {
protected ThreadLocalRandom random = ThreadLocalRandom.current();
protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception {
processClientWithCorrectNodeAccess();
List<String> listKeys = new ArrayList<>();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec));
payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType));
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq);
String keys = "Node Control/Rebirth";
boolean valueRebirth = false;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth));
payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType));
listKeys.add(keys);
keys = "Node Metric int32";
int valueNodeInt32 = 1024;
metricDataType = Int32;
TsKvEntry expectedSsKvEntryNodeInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueNodeInt32)));
payloadBirthNode.addMetrics(createMetric(valueNodeInt32, expectedSsKvEntryNodeInt32, metricDataType));
listKeys.add(keys);
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys));
return !finalFuture.get().get().isEmpty();
});
Assert.assertEquals(listKeys.size(), finalFuture.get().get().size());
}
protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception {
processClientWithCorrectNodeAccess();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
List<String> listKeys = new ArrayList<>();
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec));
payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType));
listKeys.add(SparkplugMessageType.NCMD.name() + " " + keysBdSeq);
String keys = "Node Control/Rebirth";
boolean valueRebirth = true;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth));
payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType));
listKeys.add(keys);
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NCMD.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys));
return !finalFuture.get().get().isEmpty();
});
Assert.assertEquals(listKeys.size(), finalFuture.get().get().size());
}
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception {
processClientWithCorrectNodeAccess();
Random random = new Random();
String deviceName = deviceId + "_" + 10;
String messageTypeName = SparkplugMessageType.DDATA.name();
String messageTypeName = SparkplugMessageType.NDATA.name();
List<String> listKeys = new ArrayList<>();
List<TsKvEntry> listTsKvEntry = new ArrayList<>();
SparkplugBProto.Payload.Builder ddataPayload = SparkplugBProto.Payload.newBuilder()
SparkplugBProto.Payload.Builder ndataPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis())
.setSeq(getSeqNum());
long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS;
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
createdAddMetricTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
ndataPayload.build().toByteArray(), 0, false);
}
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
return finalFuture.get().get().size() == listTsKvEntry.size();
});
Assert.assertTrue("Expected tsKvEntrys is not equal Actual tsKvEntrys", listTsKvEntry.containsAll(finalFuture.get().get()));
Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry));
}
private void createdAddMetricTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys,
SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException {
String keys = "MyInt8";
MetricDataType metricDataType = Int8;
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)((byte)random.nextInt())));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt8(), ts, Int8));
listKeys.add(keys);
keys = "MyInt16";
metricDataType = Int16;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)((short)random.nextInt())));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt16(), ts, Int16));
listKeys.add(keys);
keys = "MyInt32";
metricDataType = Int32;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)(random.nextInt())));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt32(), ts, Int32));
listKeys.add(keys);
keys = "MyInt64";
metricDataType = UInt64;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, random.nextLong()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt64(), ts, Int64));
listKeys.add(keys);
keys = "MyUInt8";
metricDataType = UInt8;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)((short)random.nextInt())));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt8(), ts, UInt8));
listKeys.add(keys);
keys = "MyUInt16";
metricDataType = UInt16;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)random.nextInt()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt16(), ts, UInt16));
listKeys.add(keys);
keys = "MyUInt32";
metricDataType = UInt32;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, random.nextLong()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
keys = "MyUInt32I";
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt32I(), ts, UInt32));
listKeys.add(keys);
keys = "MyUInt32L";
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt32L(), ts, UInt32));
listKeys.add(keys);
keys = "MyUInt64";
metricDataType = UInt64;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, BigInteger.valueOf(random.nextLong()).longValue()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt64(), ts, UInt64));
listKeys.add(keys);
keys = "MyFloat";
metricDataType = MetricDataType.Float;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long)random.nextFloat()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvFloat(dataPayload, keys, nextFloat(0, 100), ts, MetricDataType.Float));
listKeys.add(keys);
keys = "MyDateTime";
metricDataType = MetricDataType.DateTime;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, ts));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, ts, ts, MetricDataType.DateTime));
listKeys.add(keys);
keys = "MyDouble";
metricDataType = MetricDataType.Double;
tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, (long) random.nextDouble()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvDouble(dataPayload, keys, nextDouble(), ts, MetricDataType.Double));
listKeys.add(keys);
keys = "MyBoolean";
metricDataType = MetricDataType.Boolean;
tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, random.nextBoolean()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvBoolean(dataPayload, keys, nextBoolean(), ts, MetricDataType.Boolean));
listKeys.add(keys);
keys = "MyString";
metricDataType = MetricDataType.String;
tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, newUUID()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nexString(), ts, MetricDataType.String));
listKeys.add(keys);
keys = "MyText";
metricDataType = MetricDataType.Text;
tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, newUUID()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nexString(), ts, MetricDataType.Text));
listKeys.add(keys);
keys = "MyUUID";
metricDataType = MetricDataType.Text;
tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, newUUID()));
ddataPayload.addMetrics(createMetric(tsKvEntry, metricDataType));
listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nexString(), ts, MetricDataType.UUID));
listKeys.add(keys);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode + "/" + deviceName,
ddataPayload.build().toByteArray(), 0, false);
}
}
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys));
return !finalFuture.get().get().isEmpty();
});
Assert.assertEquals(listKeys.size(), finalFuture.get().get().size());
private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value))));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
return tsKvEntry;
}
protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception {
processClientWithCorrectNodeAccess();
List<String> listKeys = new ArrayList<>();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec));
payloadBirthNode.addMetrics(createMetric(tsKvEntryBdSecOriginal, metricDataType));
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq);
private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
var f = new BigDecimal(String.valueOf(value));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(keys, f.doubleValue()));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
return tsKvEntry;
}
String keys = "Node Control/Rebirth";
boolean valueRebirth = false;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth));
payloadBirthNode.addMetrics(createMetric(expectedSsKvEntryRebirth , metricDataType));
listKeys.add(keys);
private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String keys, double value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
var d = new BigDecimal(String.valueOf(value));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, d.longValueExact()));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
return tsKvEntry;
}
keys = "Node Metric int32";
int valueNodeInt32 = 1024;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryNodeInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueNodeInt32)));
payloadBirthNode.addMetrics(createMetric(expectedSsKvEntryNodeInt32 , metricDataType));
listKeys.add(keys);
private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String keys, boolean value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, value));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
return tsKvEntry;
}
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String keys, String value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, value));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
return tsKvEntry;
}
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys));
return !finalFuture.get().get().isEmpty();
});
Assert.assertEquals(listKeys.size(), finalFuture.get().get().size());
private TsKvEntry createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys, String value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(keys, value));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
return tsKvEntry;
}
protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception {
processClientWithCorrectNodeAccess();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
List<String> listKeys = new ArrayList<>();
long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec));
payloadBirthNode.addMetrics(createMetric(tsKvEntryBdSecOriginal, metricDataType));
listKeys.add(SparkplugMessageType.NCMD.name() + " " + keysBdSeq);
private byte nextInt8() {
return (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE);
}
String keys = "Node Control/Rebirth";
boolean valueRebirth = true;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth));
payloadBirthNode.addMetrics(createMetric(expectedSsKvEntryRebirth , metricDataType));
listKeys.add(keys);
private short nextUInt8() {
return (short) random.nextInt(0, Byte.MAX_VALUE * 2 + 1);
}
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NCMD.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
private short nextInt16() {
return (short) random.nextInt(Short.MIN_VALUE, Short.MAX_VALUE);
}
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys));
return !finalFuture.get().get().isEmpty();
});
Assert.assertEquals(listKeys.size(), finalFuture.get().get().size());
private short nextUInt16() {
return (short) random.nextInt(0, Short.MAX_VALUE * 2 + 1);
}
private int nextInt32() {
return random.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE);
}
private int nextUInt32I() {
return random.nextInt(0, Integer.MAX_VALUE);
}
private long nextUInt32L() {
long l = Integer.MAX_VALUE;
return random.nextLong(0, l * 2 + 1);
}
private long nextInt64() {
return random.nextLong(Long.MIN_VALUE, Long.MAX_VALUE);
}
private long nextUInt64() {
double d = Long.MAX_VALUE;
return random.nextLong(0, (long) (d * 2 + 1));
}
private String newUUID() {
private double nextDouble() {
return random.nextDouble(Long.MIN_VALUE, Long.MAX_VALUE);
}
private float nextFloat(float min, float max) {
if (min >= max)
throw new IllegalArgumentException("max must be greater than min");
float result = ThreadLocalRandom.current().nextFloat() * (max - min) + min;
if (result >= max) // correct for rounding
result = Float.intBitsToFloat(Float.floatToIntBits(max) - 1);
return result;
}
private boolean nextBoolean() {
return random.nextBoolean();
}
private String nexString() {
return java.util.UUID.randomUUID().toString();
}
private List<String> getActualKeysList(DeviceId deviceId, List<String> expectedKeys, long start) throws Exception {
// long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 3000;
List<String> actualKeys = null;
while (start <= end) {
actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", new TypeReference<>() {
});
Map<String, List<JsonNode>> timeseries = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" +
savedGateway.getId().getId() + "/values/timeseries?keys=" + expectedKeys.get(6), new TypeReference<>() {
});
if (actualKeys.size() == expectedKeys.size()) {
break;
}
Thread.sleep(300);
start += 100;
}
return actualKeys;
}
}

10
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java

@ -38,11 +38,6 @@ public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSpa
client.disconnect(); }
}
@Test
public void testClientWithCorrectAccessTokenPushDeviceMetricBuildSimple() throws Exception {
processClientWithCorrectAccessTokenPushDeviceMetricBuildSimple();
}
@Test
public void testClientWithCorrectAccessTokenPublishNBIRTH() throws Exception {
processClientWithCorrectAccessTokenPublishNBIRTH();
@ -53,4 +48,9 @@ public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSpa
processClientWithCorrectAccessTokenPublishNCMDReBirth();
}
@Test
public void testClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception {
processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple();
}
}

215
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
@ -50,116 +51,120 @@ public class SparkplugMetricUtil {
TransportProtos.KeyValueProto.Builder builderProto = TransportProtos.KeyValueProto.newBuilder();
ArrayNode nodeArray = newArrayNode();
MetricDataType metricDataType = MetricDataType.fromInteger(metricType);
switch (metricDataType) {
case Boolean:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V)
.setBoolV(protoMetric.getBooleanValue()).build());
case DateTime:
case Int64:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getLongValue()).build());
case Float:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV((long) protoMetric.getFloatValue()).build());
case Double:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV(protoMetric.getDoubleValue()).build());
case Int8:
case UInt8:
case Int16:
case Int32:
case UInt16:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getIntValue()).build());
case UInt32:
case UInt64:
if (protoMetric.hasIntValue()) {
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getIntValue()).build());
} else if (protoMetric.hasLongValue()) {
try {
switch (metricDataType) {
case Boolean:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V)
.setBoolV(protoMetric.getBooleanValue()).build());
case DateTime:
case Int64:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getLongValue()).build());
} else {
log.error("Invalid value for UInt32 datatype");
throw new ThingsboardException("Invalid value for " + MetricDataType.fromInteger(metricType).name() + " datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
case String:
case Text:
case UUID:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoMetric.getStringValue()).build());
case Bytes:
case BooleanArray:
case Int8Array:
case Int16Array:
case Int32Array:
case UInt8Array:
case UInt16Array:
case FloatArray:
case DoubleArray:
case DateTimeArray:
case Int64Array:
case UInt64Array:
case UInt32Array:
ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) {
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
}
while (byteBuffer.hasRemaining()) {
setValueToNodeArray(nodeArray, byteBuffer, metricType);
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
case StringArray:
ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN);
StringBuilder sb = new StringBuilder();
while (stringByteBuffer.hasRemaining()) {
byte b = stringByteBuffer.get();
if (b == (byte) 0) {
nodeArray.add(sb.toString());
sb = new StringBuilder();
case Float:
var f = new BigDecimal(String.valueOf(protoMetric.getFloatValue()));
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV(f.doubleValue()).build());
case Double:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV(protoMetric.getDoubleValue()).build());
case Int8:
case UInt8:
case Int16:
case Int32:
case UInt16:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getIntValue()).build());
case UInt32:
case UInt64:
if (protoMetric.hasIntValue()) {
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getIntValue()).build());
} else if (protoMetric.hasLongValue()) {
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getLongValue()).build());
} else {
sb.append((char) b);
log.error("Invalid value for UInt32 datatype");
throw new ThingsboardException("Invalid value for " + MetricDataType.fromInteger(metricType).name() + " datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
case DataSet:
case Template:
case File:
//TODO
// Build the and create the DataSet
/**
SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue();
return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList())
.addTypes(convertDataSetDataTypes(protoDataSet.getTypesList()))
.addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList()))
.createDataSet();
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoDataSet.toString()).build());
**/
//TODO
// Build the and create the Template
/**
SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue();
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV( protoTemplate.toString()).build());
**/
//TODO
// Build the and create the File
/**
String filename = protoMetric.getMetadata().getFileName();
return Optional.of(builderPrbyteValueoto.setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build());
**/
return Optional.empty();
case Unknown:
default:
throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
case String:
case Text:
case UUID:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoMetric.getStringValue()).build());
case Bytes:
case BooleanArray:
case Int8Array:
case Int16Array:
case Int32Array:
case UInt8Array:
case UInt16Array:
case FloatArray:
case DoubleArray:
case DateTimeArray:
case Int64Array:
case UInt64Array:
case UInt32Array:
ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) {
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
}
while (byteBuffer.hasRemaining()) {
setValueToNodeArray(nodeArray, byteBuffer, metricType);
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
case StringArray:
ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN);
StringBuilder sb = new StringBuilder();
while (stringByteBuffer.hasRemaining()) {
byte b = stringByteBuffer.get();
if (b == (byte) 0) {
nodeArray.add(sb.toString());
sb = new StringBuilder();
} else {
sb.append((char) b);
}
}
return Optional.of(builderProto.setKey(key)
.setJsonV(nodeArray.toString()).build());
case DataSet:
case Template:
case File:
//TODO
// Build the and create the DataSet
/**
SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue();
return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList())
.addTypes(convertDataSetDataTypes(protoDataSet.getTypesList()))
.addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList()))
.createDataSet();
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoDataSet.toString()).build());
**/
//TODO
// Build the and create the Template
/**
SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue();
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV( protoTemplate.toString()).build());
**/
//TODO
// Build the and create the File
/**
String filename = protoMetric.getMetadata().getFileName();
return Optional.of(builderPrbyteValueoto.setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build());
**/
return Optional.empty();
case Unknown:
default:
throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
} catch (Exception e){
log.error("", e);
return Optional.empty();
}
}
private static void setValueToNodeArray(ArrayNode nodeArray, ByteBuffer byteBuffer, int metricType) {

Loading…
Cancel
Save