Browse Source
# Conflicts: # application/src/main/java/org/thingsboard/server/controller/AlarmController.java # application/src/main/java/org/thingsboard/server/controller/EntityQueryController.javapull/7527/head
773 changed files with 19157 additions and 11469 deletions
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -1,85 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.utils; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.util.concurrent.Executor; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* This class deduplicate executions of the specified function. |
|||
* Useful in cluster mode, when you get event about partition change multiple times. |
|||
* Assuming that the function execution is expensive, we should execute it immediately when first time event occurs and |
|||
* later, once the processing of first event is done, process last pending task. |
|||
* |
|||
* @param <P> parameters of the function |
|||
*/ |
|||
@Slf4j |
|||
public class EventDeduplicationExecutor<P> { |
|||
private final String name; |
|||
private final ExecutorService executor; |
|||
private final Consumer<P> function; |
|||
private P pendingTask; |
|||
private boolean busy; |
|||
|
|||
public EventDeduplicationExecutor(String name, ExecutorService executor, Consumer<P> function) { |
|||
this.name = name; |
|||
this.executor = executor; |
|||
this.function = function; |
|||
} |
|||
|
|||
public void submit(P params) { |
|||
log.info("[{}] Going to submit: {}", name, params); |
|||
synchronized (EventDeduplicationExecutor.this) { |
|||
if (!busy) { |
|||
busy = true; |
|||
pendingTask = null; |
|||
try { |
|||
log.info("[{}] Submitting task: {}", name, params); |
|||
executor.submit(() -> { |
|||
try { |
|||
log.info("[{}] Executing task: {}", name, params); |
|||
function.accept(params); |
|||
} catch (Throwable e) { |
|||
log.warn("[{}] Failed to process task with parameters: {}", name, params, e); |
|||
throw e; |
|||
} finally { |
|||
unlockAndProcessIfAny(); |
|||
} |
|||
}); |
|||
} catch (Throwable e) { |
|||
log.warn("[{}] Failed to submit task with parameters: {}", name, params, e); |
|||
unlockAndProcessIfAny(); |
|||
throw e; |
|||
} |
|||
} else { |
|||
log.info("[{}] Task is already in progress. {} pending task: {}", name, pendingTask == null ? "adding" : "updating", params); |
|||
pendingTask = params; |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void unlockAndProcessIfAny() { |
|||
synchronized (EventDeduplicationExecutor.this) { |
|||
busy = false; |
|||
if (pendingTask != null) { |
|||
submit(pendingTask); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,58 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.mail; |
|||
|
|||
import org.mockito.Mockito; |
|||
import org.mockito.invocation.InvocationOnMock; |
|||
import org.mockito.stubbing.Answer; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.context.annotation.Primary; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.thingsboard.rule.engine.api.MailService; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
|
|||
@Profile("test") |
|||
@Configuration |
|||
public class TestMailService { |
|||
|
|||
public static String currentActivateToken; |
|||
public static String currentResetPasswordToken; |
|||
|
|||
@Bean |
|||
@Primary |
|||
public MailService mailService() throws ThingsboardException { |
|||
MailService mailService = Mockito.mock(MailService.class); |
|||
Mockito.doAnswer(new Answer<Void>() { |
|||
public Void answer(InvocationOnMock invocation) { |
|||
Object[] args = invocation.getArguments(); |
|||
String activationLink = (String) args[0]; |
|||
currentActivateToken = activationLink.split("=")[1]; |
|||
return null; |
|||
} |
|||
}).when(mailService).sendActivationEmail(Mockito.anyString(), Mockito.anyString()); |
|||
Mockito.doAnswer(new Answer<Void>() { |
|||
public Void answer(InvocationOnMock invocation) { |
|||
Object[] args = invocation.getArguments(); |
|||
String passwordResetLink = (String) args[0]; |
|||
currentResetPasswordToken = passwordResetLink.split("=")[1]; |
|||
return null; |
|||
} |
|||
}).when(mailService).sendResetPasswordEmailAsync(Mockito.anyString(), Mockito.anyString()); |
|||
return mailService; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,462 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug; |
|||
|
|||
import com.fasterxml.jackson.databind.node.ArrayNode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.eclipse.paho.mqttv5.client.IMqttToken; |
|||
import org.eclipse.paho.mqttv5.client.MqttCallback; |
|||
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; |
|||
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; |
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.eclipse.paho.mqttv5.common.MqttMessage; |
|||
import org.eclipse.paho.mqttv5.common.packet.MqttConnAck; |
|||
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; |
|||
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; |
|||
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; |
|||
import org.junit.Assert; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.TransportPayloadType; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
|||
import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
|||
import org.thingsboard.server.common.data.kv.JsonDataEntry; |
|||
import org.thingsboard.server.common.data.kv.LongDataEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; |
|||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; |
|||
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; |
|||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Calendar; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ThreadLocalRandom; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicReference; |
|||
|
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; |
|||
import static org.thingsboard.common.util.JacksonUtil.newArrayNode; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int8; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt16; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttIntegrationTest { |
|||
|
|||
protected MqttV5TestClient client; |
|||
protected SparkplugMqttCallback mqttCallback; |
|||
protected Calendar calendar = Calendar.getInstance(); |
|||
protected ThreadLocalRandom random = ThreadLocalRandom.current(); |
|||
|
|||
protected static final String groupId = "SparkplugBGroupId"; |
|||
protected static final String edgeNode = "SparkpluBNode"; |
|||
protected static final String keysBdSeq = "bdSeq"; |
|||
protected static final String alias = "Failed Telemetry/Attribute proto sparkplug payload. SparkplugMessageType "; |
|||
protected String deviceId = "Test Sparkplug B Device"; |
|||
protected int bdSeq = 0; |
|||
protected int seq = 0; |
|||
protected static final long PUBLISH_TS_DELTA_MS = 86400000;// Publish start TS <-> 24h
|
|||
|
|||
// NBIRTH
|
|||
protected static final String keyNodeRebirth = "Node Control/Rebirth"; |
|||
|
|||
//*BIRTH
|
|||
protected static final MetricDataType metricBirthDataType_Int32 = Int32; |
|||
protected static final String metricBirthName_Int32 = "Device Metric int32"; |
|||
protected Set<String> sparkplugAttributesMetricNames; |
|||
|
|||
public void beforeSparkplugTest() throws Exception { |
|||
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() |
|||
.gatewayName("Test Connect Sparkplug client node") |
|||
.isSparkplug(true) |
|||
.sparkplugAttributesMetricNames(sparkplugAttributesMetricNames) |
|||
.transportPayloadType(TransportPayloadType.PROTOBUF) |
|||
.build(); |
|||
processBeforeTest(configProperties); |
|||
} |
|||
|
|||
public void clientWithCorrectNodeAccessTokenWithNDEATH() throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
long value = bdSeq = 0; |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(ts, value); |
|||
} |
|||
|
|||
public void clientWithCorrectNodeAccessTokenWithNDEATH(long ts, long value) throws Exception { |
|||
IMqttToken connectionResult = clientConnectWithNDEATH(ts, value); |
|||
MqttWireMessage response = connectionResult.getResponse(); |
|||
Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType()); |
|||
MqttConnAck connAckMsg = (MqttConnAck) response; |
|||
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode()); |
|||
} |
|||
|
|||
public IMqttToken clientConnectWithNDEATH(long ts, long value, String... nameSpaceBad) throws Exception { |
|||
String key = keysBdSeq; |
|||
MetricDataType metricDataType = Int64; |
|||
SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(calendar.getTimeInMillis()); |
|||
deathPayload.addMetrics(createMetric(value, ts, key, metricDataType)); |
|||
byte[] deathBytes = deathPayload.build().toByteArray(); |
|||
this.client = new MqttV5TestClient(); |
|||
this.mqttCallback = new SparkplugMqttCallback(); |
|||
this.client.setCallback(this.mqttCallback); |
|||
MqttConnectionOptions options = new MqttConnectionOptions(); |
|||
options.setUserName(gatewayAccessToken); |
|||
String nameSpace = nameSpaceBad.length == 0 ? NAMESPACE : nameSpaceBad[0]; |
|||
String topic = nameSpace + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode; |
|||
MqttMessage msg = new MqttMessage(); |
|||
msg.setId(0); |
|||
msg.setPayload(deathBytes); |
|||
options.setWill(topic, msg); |
|||
return client.connect(options); |
|||
} |
|||
|
|||
protected List<Device> connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices, long ts) throws Exception { |
|||
List<Device> devices = new ArrayList<>(); |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
MetricDataType metricDataType = Int32; |
|||
String key = "Node Metric int32"; |
|||
int valueDeviceInt32 = 1024; |
|||
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); |
|||
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(ts) |
|||
.setSeq(getBdSeqNum()); |
|||
payloadBirthNode.addMetrics(metric); |
|||
payloadBirthNode.setTimestamp(ts); |
|||
if (client.isConnected()) { |
|||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, |
|||
payloadBirthNode.build().toByteArray(), 0, false); |
|||
} |
|||
|
|||
valueDeviceInt32 = 4024; |
|||
metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32); |
|||
for (int i = 0; i < cntDevices; i++) { |
|||
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(ts) |
|||
.setSeq(getSeqNum()); |
|||
String deviceName = deviceId + "_" + i; |
|||
|
|||
payloadBirthDevice.addMetrics(metric); |
|||
if (client.isConnected()) { |
|||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, |
|||
payloadBirthDevice.build().toByteArray(), 0, false); |
|||
AtomicReference<Device> device = new AtomicReference<>(); |
|||
await(alias + "find device [" + deviceName + "] after created") |
|||
.atMost(200, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); |
|||
return device.get() != null; |
|||
}); |
|||
devices.add(device.get()); |
|||
} |
|||
|
|||
} |
|||
|
|||
Assert.assertEquals(cntDevices, devices.size()); |
|||
return devices; |
|||
} |
|||
|
|||
protected long getBdSeqNum() throws Exception { |
|||
if (bdSeq == 256) { |
|||
bdSeq = 0; |
|||
} |
|||
return bdSeq++; |
|||
} |
|||
|
|||
protected long getSeqNum() throws Exception { |
|||
if (seq == 256) { |
|||
seq = 0; |
|||
} |
|||
return seq++; |
|||
} |
|||
|
|||
protected List<String> connectionWithNBirth(MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception { |
|||
List<String> listKeys = new ArrayList<>(); |
|||
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(calendar.getTimeInMillis()); |
|||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; |
|||
long valueBdSec = getBdSeqNum(); |
|||
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64)); |
|||
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); |
|||
payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean)); |
|||
listKeys.add(keyNodeRebirth); |
|||
|
|||
payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType)); |
|||
listKeys.add(metricKey); |
|||
|
|||
if (client.isConnected()) { |
|||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, |
|||
payloadBirthNode.build().toByteArray(), 0, false); |
|||
} |
|||
return listKeys; |
|||
} |
|||
|
|||
protected void createdAddMetricValuePrimitiveTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys, |
|||
SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { |
|||
|
|||
String keys = "MyInt8"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt8(), ts, Int8)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyInt16"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt16(), ts, Int16)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyInt32"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt32(), ts, Int32)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyInt64"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt64(), ts, Int64)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyUInt8"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt8(), ts, UInt8)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyUInt16"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt16(), ts, UInt16)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyUInt32"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt32(), ts, UInt32)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyUInt64"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextUInt64(), ts, UInt64)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyFloat"; |
|||
listTsKvEntry.add(createdAddMetricTsKvFloat(dataPayload, keys, nextFloat(0, 100), ts, MetricDataType.Float)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyDateTime"; |
|||
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextDateTime(), ts, MetricDataType.DateTime)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyDouble"; |
|||
listTsKvEntry.add(createdAddMetricTsKvDouble(dataPayload, keys, nextDouble(), ts, MetricDataType.Double)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyBoolean"; |
|||
listTsKvEntry.add(createdAddMetricTsKvBoolean(dataPayload, keys, nextBoolean(), ts, MetricDataType.Boolean)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyString"; |
|||
listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nextString(), ts, MetricDataType.String)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyText"; |
|||
listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nextString(), ts, MetricDataType.Text)); |
|||
listKeys.add(keys); |
|||
|
|||
keys = "MyUUID"; |
|||
listTsKvEntry.add(createdAddMetricTsKvString(dataPayload, keys, nextString(), ts, MetricDataType.UUID)); |
|||
listKeys.add(keys); |
|||
|
|||
} |
|||
|
|||
protected void createdAddMetricValueArraysPrimitiveTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys, |
|||
SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { |
|||
String keys = "MyBytesArray"; |
|||
byte[] bytes = {nextInt8(), nextInt8(), nextInt8()}; |
|||
createdAddMetricTsKvJson(dataPayload, keys, bytes, ts, Bytes, listTsKvEntry, listKeys); |
|||
} |
|||
|
|||
private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String key, Object value, |
|||
long ts, MetricDataType metricDataType) throws ThingsboardException { |
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, Long.valueOf(String.valueOf(value)))); |
|||
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); |
|||
return tsKvEntry; |
|||
} |
|||
|
|||
private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String key, float value, |
|||
long ts, MetricDataType metricDataType) throws ThingsboardException { |
|||
Double dd = Double.parseDouble(Float.toString(value)); |
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(key, dd)); |
|||
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); |
|||
return tsKvEntry; |
|||
} |
|||
|
|||
private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String key, double value, |
|||
long ts, MetricDataType metricDataType) throws ThingsboardException { |
|||
Long l = Double.valueOf(value).longValue(); |
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, l)); |
|||
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); |
|||
return tsKvEntry; |
|||
} |
|||
|
|||
private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String key, boolean value, |
|||
long ts, MetricDataType metricDataType) throws ThingsboardException { |
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(key, value)); |
|||
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); |
|||
return tsKvEntry; |
|||
} |
|||
|
|||
private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String key, String value, |
|||
long ts, MetricDataType metricDataType) throws ThingsboardException { |
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(key, value)); |
|||
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); |
|||
return tsKvEntry; |
|||
} |
|||
|
|||
private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String key, |
|||
Object values, long ts, MetricDataType metricDataType, |
|||
List<TsKvEntry> listTsKvEntry, |
|||
List<String> listKeys) throws ThingsboardException { |
|||
ArrayNode nodeArray = newArrayNode(); |
|||
switch (metricDataType) { |
|||
case Bytes: |
|||
for (byte b : (byte[]) values) { |
|||
nodeArray.add(b); |
|||
} |
|||
break; |
|||
default: |
|||
throw new IllegalStateException("Unexpected value: " + metricDataType); |
|||
} |
|||
if (nodeArray.size() > 0) { |
|||
Optional<TsKvEntry> tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(key, nodeArray.toString()))); |
|||
if (tsKvEntryOptional.isPresent()) { |
|||
dataPayload.addMetrics(createMetric(values, ts, key, metricDataType)); |
|||
listTsKvEntry.add(tsKvEntryOptional.get()); |
|||
listKeys.add(key); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private byte nextInt8() { |
|||
return (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE); |
|||
} |
|||
|
|||
private short nextUInt8() { |
|||
return (short) random.nextInt(0, Byte.MAX_VALUE * 2 + 1); |
|||
} |
|||
|
|||
private short nextInt16() { |
|||
return (short) random.nextInt(Short.MIN_VALUE, Short.MAX_VALUE); |
|||
} |
|||
|
|||
private int nextUInt16() { |
|||
return random.nextInt(0, Short.MAX_VALUE * 2 + 1); |
|||
} |
|||
|
|||
protected int nextInt32() { |
|||
return random.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE); |
|||
} |
|||
|
|||
protected long nextUInt32() { |
|||
long l = Integer.MAX_VALUE; |
|||
return random.nextLong(0, l * 2 + 1); |
|||
} |
|||
|
|||
private long nextInt64() { |
|||
return random.nextLong(Long.MIN_VALUE, Long.MAX_VALUE); |
|||
} |
|||
|
|||
private long nextUInt64() { |
|||
double d = Long.MAX_VALUE; |
|||
return random.nextLong(0, (long) (d * 2 + 1)); |
|||
} |
|||
|
|||
protected double nextDouble() { |
|||
return random.nextDouble(Long.MIN_VALUE, Long.MAX_VALUE); |
|||
} |
|||
|
|||
private long nextDateTime() { |
|||
long min = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; |
|||
long max = calendar.getTimeInMillis(); |
|||
return random.nextLong(min, max); |
|||
} |
|||
|
|||
protected float nextFloat(float min, float max) { |
|||
if (min >= max) |
|||
throw new IllegalArgumentException("max must be greater than min"); |
|||
float result = ThreadLocalRandom.current().nextFloat() * (max - min) + min; |
|||
if (result >= max) // correct for rounding
|
|||
result = Float.intBitsToFloat(Float.floatToIntBits(max) - 1); |
|||
return result; |
|||
} |
|||
|
|||
protected boolean nextBoolean() { |
|||
return random.nextBoolean(); |
|||
} |
|||
|
|||
protected String nextString() { |
|||
return java.util.UUID.randomUUID().toString(); |
|||
} |
|||
|
|||
public class SparkplugMqttCallback implements MqttCallback { |
|||
private final List<SparkplugBProto.Payload.Metric> messageArrivedMetrics = new ArrayList<>(); |
|||
|
|||
@Override |
|||
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void mqttErrorOccurred(MqttException e) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void messageArrived(String topic, MqttMessage mqttMsg) throws Exception { |
|||
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(mqttMsg.getPayload()); |
|||
messageArrivedMetrics.addAll(sparkplugBProtoNode.getMetricsList()); |
|||
} |
|||
|
|||
@Override |
|||
public void deliveryComplete(IMqttToken iMqttToken) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void connectComplete(boolean b, String s) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void authPacketArrived(int i, MqttProperties mqttProperties) { |
|||
|
|||
} |
|||
|
|||
public List<SparkplugBProto.Payload.Metric> getMessageArrivedMetrics() { |
|||
return messageArrivedMetrics; |
|||
} |
|||
|
|||
public void deleteMessageArrivedMetrics(int id) { |
|||
messageArrivedMetrics.remove(id); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,432 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.attributes; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import io.netty.handler.codec.mqtt.MqttQoS; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.Assert; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicReference; |
|||
|
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends AbstractMqttV5ClientSparkplugTest { |
|||
|
|||
protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
List<String> listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); |
|||
// Shared attribute "Node Control/Rebirth" = true. type = NCMD.
|
|||
boolean value = true; |
|||
Assert.assertTrue(listKeys.contains(keyNodeRebirth)); |
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}"; |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(keyNodeRebirth, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); |
|||
} |
|||
|
|||
/** |
|||
* If boolean - send long 0 or 1 |
|||
* If String - try to parse long |
|||
* If double - cast long |
|||
* If we can't parse, cast, or JSON there - debug the message with the id of the devise/node, tenant, |
|||
* the name and type of the attribute into an error and don't send anything. |
|||
*/ |
|||
protected void processClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
MetricDataType metricDataType = MetricDataType.Boolean; |
|||
String metricKey = "MyBoolean"; |
|||
Object metricValue = nextBoolean(); |
|||
connectionWithNBirth(metricDataType, metricKey, metricValue); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
|
|||
// Boolean <-> String
|
|||
boolean expectedValue = true; |
|||
String valueStr = "123"; |
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueStr + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
expectedValue = false; |
|||
valueStr = "0"; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueStr + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
// Boolean <-> Integer
|
|||
expectedValue = true; |
|||
Integer valueInt = nextInt32(); |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueInt + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
expectedValue = false; |
|||
valueInt = 0; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueInt + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
MetricDataType metricDataType = UInt32; |
|||
String metricKey = "MyLong"; |
|||
Object metricValue = nextUInt32(); |
|||
connectionWithNBirth(metricDataType, metricKey, metricValue); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
|
|||
// Long <-> String
|
|||
String valueStr = "123"; |
|||
long expectedValue = Long.valueOf(valueStr); |
|||
|
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueStr + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getLongValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
// Long <-> Boolean
|
|||
Boolean valueBoolean = true; |
|||
expectedValue = 1L; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getLongValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
valueBoolean = false; |
|||
expectedValue = 0L; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getLongValue()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
MetricDataType metricDataType = MetricDataType.Float; |
|||
String metricKey = "MyFloat"; |
|||
Object metricValue = nextFloat(30, 400); |
|||
connectionWithNBirth(metricDataType, metricKey, metricValue); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
|
|||
// Float <-> String
|
|||
String valueStr = "123.345"; |
|||
float expectedValue = Float.valueOf(valueStr); |
|||
|
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueStr + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getFloatValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
// Float <-> Boolean
|
|||
Boolean valueBoolean = true; |
|||
expectedValue = 1f; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getFloatValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
valueBoolean = false; |
|||
expectedValue = 0f; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getFloatValue()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
MetricDataType metricDataType = MetricDataType.Double; |
|||
String metricKey = "MyDouble"; |
|||
Object metricValue = nextDouble(); |
|||
connectionWithNBirth(metricDataType, metricKey, metricValue); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
|
|||
// Double <-> String
|
|||
String valueStr = "123345456"; |
|||
double expectedValue = Double.valueOf(valueStr); |
|||
|
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueStr + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getDoubleValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
// Double <-> Boolean
|
|||
Boolean valueBoolean = true; |
|||
expectedValue = 1d; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getDoubleValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
valueBoolean = false; |
|||
expectedValue = 0d; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getDoubleValue()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
MetricDataType metricDataType = MetricDataType.String; |
|||
String metricKey = "MyString"; |
|||
Object metricValue = nextString(); |
|||
connectionWithNBirth(metricDataType, metricKey, metricValue); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
|
|||
// String <-> Long
|
|||
long valueLong = 123345456L; |
|||
String expectedValue = String.valueOf(valueLong); |
|||
|
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueLong + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
// String <-> Boolean
|
|||
Boolean valueBoolean = true; |
|||
expectedValue = "true"; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
valueBoolean = false; |
|||
expectedValue = "false"; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricKey + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricKey, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue()); |
|||
} |
|||
|
|||
protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); |
|||
|
|||
// Integer <-> Integer
|
|||
int expectedValueInt = 123456; |
|||
|
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + expectedValueInt + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.DBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); |
|||
} |
|||
|
|||
protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); |
|||
|
|||
// Int <-> String
|
|||
String valueStr = "123"; |
|||
long expectedValue = Long.valueOf(valueStr); |
|||
|
|||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueStr + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.DBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
// Int <-> Boolean
|
|||
Boolean valueBoolean = true; |
|||
expectedValue = 1; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); |
|||
mqttCallback.deleteMessageArrivedMetrics(0); |
|||
|
|||
valueBoolean = false; |
|||
expectedValue = 0; |
|||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}"; |
|||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); |
|||
} |
|||
|
|||
protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); |
|||
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; |
|||
AtomicReference<List<String>> actualKeys = new AtomicReference<>(); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { |
|||
})); |
|||
return actualKeys.get().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0)); |
|||
} |
|||
|
|||
protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); |
|||
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; |
|||
AtomicReference<List<String>> actualKeys = new AtomicReference<>(); |
|||
await(alias + SparkplugMessageType.DBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { |
|||
})); |
|||
return actualKeys.get().size() == 1; |
|||
}); |
|||
Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0)); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,55 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.attributes; |
|||
|
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
import java.util.HashSet; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@DaoSqlTest |
|||
public class MqttV5ClientSparkplugBAttributesInProfileTest extends AbstractMqttV5ClientSparkplugAttributesTest { |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
sparkplugAttributesMetricNames = new HashSet<>(); |
|||
sparkplugAttributesMetricNames.add(metricBirthName_Int32); |
|||
beforeSparkplugTest(); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest () throws MqttException { |
|||
if (client.isConnected()) { |
|||
client.disconnect(); } |
|||
} |
|||
|
|||
@Test |
|||
public void testClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { |
|||
processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { |
|||
processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,81 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.attributes; |
|||
|
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@DaoSqlTest |
|||
public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSparkplugAttributesTest { |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
beforeSparkplugTest(); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest () throws MqttException { |
|||
if (client.isConnected()) { |
|||
client.disconnect(); } |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNCMDReBirth(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception { |
|||
processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { |
|||
processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,178 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.connection; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.junit.Assert; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.LongDataEntry; |
|||
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; |
|||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; |
|||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicReference; |
|||
|
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest { |
|||
|
|||
protected void processClientWithCorrectNodeAccessTokenWithNDEATH_Test() throws Exception { |
|||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; |
|||
long value = bdSeq = 0; |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(ts, value); |
|||
|
|||
String keys = SparkplugMessageType.NDEATH.name() + " " + keysBdSeq; |
|||
TsKvEntry expectedTsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, value)); |
|||
AtomicReference<ListenableFuture<Optional<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
await(alias + SparkplugMessageType.NDEATH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), keys)); |
|||
return finalFuture.get().get().isPresent(); |
|||
}); |
|||
TsKvEntry actualTsKvEntry = finalFuture.get().get().get(); |
|||
Assert.assertEquals(expectedTsKvEntry, actualTsKvEntry); |
|||
} |
|||
|
|||
protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception { |
|||
this.client = new MqttV5TestClient(); |
|||
MqttException actualException = Assert.assertThrows(MqttException.class, () -> client.connectAndWait(gatewayAccessToken)); |
|||
String expectedMessage = "Server unavailable."; |
|||
int expectedReasonCode = 136; |
|||
Assert.assertEquals(expectedMessage, actualException.getMessage()); |
|||
Assert.assertEquals(expectedReasonCode, actualException.getReasonCode()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectNodeAccessTokenNameSpaceInvalid_Test() throws Exception { |
|||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; |
|||
long value = bdSeq = 0; |
|||
MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientConnectWithNDEATH(ts, value, "spBv1.2")); |
|||
String expectedMessage = "Server unavailable."; |
|||
int expectedReasonCode = 136; |
|||
Assert.assertEquals(expectedMessage, actualException.getMessage()); |
|||
Assert.assertEquals(expectedReasonCode, actualException.getReasonCode()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); |
|||
} |
|||
|
|||
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(int cntDevices) throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); |
|||
|
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name())); |
|||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); |
|||
return finalFuture.get().get().contains(tsKvEntry); |
|||
}); |
|||
|
|||
for (Device device : devices) { |
|||
await(alias + messageName(STATE) + ", device: " + device.getName()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); |
|||
return finalFuture.get().get().contains(tsKvEntry); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(int cntDevices, int indexDeviceDisconnect) throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); |
|||
|
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name())); |
|||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
|
|||
SparkplugBProto.Payload.Builder payloadDeathDevice = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(ts) |
|||
.setSeq(getSeqNum()); |
|||
if (client.isConnected()) { |
|||
List<Device> devicesList = new ArrayList<>(devices); |
|||
Device device = devicesList.get(indexDeviceDisconnect); |
|||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(), |
|||
payloadDeathDevice.build().toByteArray(), 0, false); |
|||
await(alias + messageName(STATE) + ", device: " + device.getName()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); |
|||
return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(int cntDevices) throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); |
|||
|
|||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name())); |
|||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
|
|||
if (client.isConnected()) { |
|||
client.disconnect(); |
|||
|
|||
await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); |
|||
return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry); |
|||
}); |
|||
|
|||
List<Device> devicesList = new ArrayList<>(devices); |
|||
for (Device device : devicesList) { |
|||
await(alias + messageName(STATE) + ", device: " + device.getName()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); |
|||
return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry); |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private boolean findEqualsKeyValueInKvEntrys(List<TsKvEntry> finalFuture, TsKvEntry tsKvEntry) { |
|||
for (TsKvEntry kvEntry : finalFuture) { |
|||
if (kvEntry.getKey().equals(tsKvEntry.getKey()) && kvEntry.getValue().equals(tsKvEntry.getValue())) { |
|||
return true; |
|||
} |
|||
} |
|||
return false; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,82 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.connection; |
|||
|
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@DaoSqlTest |
|||
public class MqttV5ClientSparkplugBConnectionTest extends AbstractMqttV5ClientSparkplugConnectionTest { |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
beforeSparkplugTest(); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws MqttException { |
|||
if (client.isConnected()) { |
|||
client.disconnect(); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenWithNDEATH() throws Exception { |
|||
processClientWithCorrectNodeAccessTokenWithNDEATH_Test(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectNodeAccessTokenWithoutNDEATH() throws Exception { |
|||
processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectNodeAccessTokenNameSpaceInvalid() throws Exception { |
|||
processClientWithCorrectNodeAccessTokenNameSpaceInvalid_Test(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenWithNDEATHCreatedOneDevice() throws Exception { |
|||
processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenWithNDEATHCreatedTwoDevice() throws Exception { |
|||
processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(2); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL() throws Exception { |
|||
processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(3); |
|||
} |
|||
|
|||
@Test |
|||
public void testConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE() throws Exception { |
|||
processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(3, 1); |
|||
} |
|||
|
|||
@Test |
|||
public void testConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All() throws Exception { |
|||
processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(3); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,108 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.rpc; |
|||
|
|||
import io.netty.handler.codec.mqtt.MqttQoS; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.Assert; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; |
|||
|
|||
@Slf4j |
|||
public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest { |
|||
|
|||
private static final int metricBirthValue_Int32 = 123456; |
|||
private static final String sparkplugRpcRequest = "{\"metricName\":\"" + metricBirthName_Int32 + "\",\"value\":" + metricBirthValue_Int32 + "}"; |
|||
|
|||
@Test |
|||
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}"; |
|||
String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway); |
|||
await(alias + SparkplugMessageType.NCMD.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(expected, actual); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); |
|||
} |
|||
|
|||
@Test |
|||
public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { |
|||
long ts = calendar.getTimeInMillis(); |
|||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); |
|||
String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}"; |
|||
String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0)); |
|||
await(alias + NCMD.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
return mqttCallback.getMessageArrivedMetrics().size() == 1; |
|||
}); |
|||
Assert.assertEquals(expected, actual); |
|||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); |
|||
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); |
|||
} |
|||
|
|||
@Test |
|||
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
String invalidateTypeMessageName = "RCMD"; |
|||
String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " + |
|||
invalidateTypeMessageName + "{\\\"metricName\\\":\\\"" + metricBirthName_Int32 + "\\\",\\\"value\\\":" + metricBirthValue_Int32 + "}\"}"; |
|||
String actual = sendRPCSparkplug(invalidateTypeMessageName, sparkplugRpcRequest, savedGateway); |
|||
Assert.assertEquals(expected, actual); |
|||
} |
|||
|
|||
@Test |
|||
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); |
|||
String metricNameBad = metricBirthName_Int32 + "_Bad"; |
|||
String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}"; |
|||
String expected = "{\"result\":\"BAD_REQUEST_PARAMS\",\"error\":\"Failed send To Node Rpc Request: " + |
|||
DCMD.name() + ". This node does not have a metricName: [" + metricNameBad + "]\"}"; |
|||
String actual = sendRPCSparkplug(DCMD.name(), sparkplugRpcRequestBad, savedGateway); |
|||
Assert.assertEquals(expected, actual); |
|||
} |
|||
|
|||
private String sendRPCSparkplug(String nameTypeMessage, String keyValue, Device device) throws Exception { |
|||
String setRpcRequest = "{\"method\": \"" + nameTypeMessage + "\", \"params\": " + keyValue + "}"; |
|||
return doPostAsync("/api/plugins/rpc/twoway/" + device.getId().getId(), setRpcRequest, String.class, status().isOk()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,61 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.rpc; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
@DaoSqlTest |
|||
@Slf4j |
|||
public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest { |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
beforeSparkplugTest(); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws MqttException { |
|||
if (client.isConnected()) { |
|||
client.disconnect(); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { |
|||
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { |
|||
processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { |
|||
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception { |
|||
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,113 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.timeseries; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.Assert; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; |
|||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; |
|||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicReference; |
|||
|
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends AbstractMqttV5ClientSparkplugTest { |
|||
|
|||
protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
List<String> listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); |
|||
Assert.assertTrue("Connection node is failed", client.isConnected()); |
|||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
await(alias + SparkplugMessageType.NBIRTH.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findLatest(tenantId, savedGateway.getId(), listKeys)); |
|||
return !finalFuture.get().get().isEmpty(); |
|||
}); |
|||
Assert.assertEquals(listKeys.size(), finalFuture.get().get().size()); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { |
|||
List<String> listKeys = new ArrayList<>(); |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
|
|||
String messageTypeName = SparkplugMessageType.NDATA.name(); |
|||
|
|||
List<TsKvEntry> listTsKvEntry = new ArrayList<>(); |
|||
|
|||
SparkplugBProto.Payload.Builder ndataPayload = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(calendar.getTimeInMillis()) |
|||
.setSeq(getSeqNum()); |
|||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; |
|||
|
|||
createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); |
|||
|
|||
if (client.isConnected()) { |
|||
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, |
|||
ndataPayload.build().toByteArray(), 0, false); |
|||
} |
|||
|
|||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
await(alias + SparkplugMessageType.NDATA.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); |
|||
return finalFuture.get().get().size() == (listTsKvEntry.size() + 1); |
|||
}); |
|||
Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); |
|||
} |
|||
|
|||
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysPrimitiveSimple() throws Exception { |
|||
clientWithCorrectNodeAccessTokenWithNDEATH(); |
|||
|
|||
String messageTypeName = SparkplugMessageType.NDATA.name(); |
|||
List<String> listKeys = new ArrayList<>(); |
|||
List<TsKvEntry> listTsKvEntry = new ArrayList<>(); |
|||
|
|||
SparkplugBProto.Payload.Builder ndataPayload = SparkplugBProto.Payload.newBuilder() |
|||
.setTimestamp(calendar.getTimeInMillis()) |
|||
.setSeq(getSeqNum()); |
|||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; |
|||
|
|||
createdAddMetricValueArraysPrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); |
|||
|
|||
if (client.isConnected()) { |
|||
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, |
|||
ndataPayload.build().toByteArray(), 0, false); |
|||
} |
|||
|
|||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); |
|||
await(alias + SparkplugMessageType.NDATA.name()) |
|||
.atMost(40, TimeUnit.SECONDS) |
|||
.until(() -> { |
|||
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); |
|||
return finalFuture.get().get().size() == (listTsKvEntry.size() + 1); |
|||
}); |
|||
Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.mqtt.sparkplug.timeseries; |
|||
|
|||
import org.eclipse.paho.mqttv5.common.MqttException; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
/** |
|||
* Created by nickAS21 on 12.01.23 |
|||
*/ |
|||
@DaoSqlTest |
|||
public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSparkplugTelemetryTest { |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
beforeSparkplugTest(); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest () throws MqttException { |
|||
if (client.isConnected()) { |
|||
client.disconnect(); } |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { |
|||
processClientWithCorrectAccessTokenPublishNBIRTH(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { |
|||
processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple(); |
|||
} |
|||
|
|||
@Test |
|||
public void testClientWithCorrectAccessTokenPushNodeMetricBuildPArraysPrimitiveSimple() throws Exception { |
|||
processClientWithCorrectAccessTokenPushNodeMetricBuildArraysPrimitiveSimple(); |
|||
} |
|||
|
|||
} |
|||
@ -1,173 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.util; |
|||
|
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.mockito.Mockito; |
|||
import org.mockito.junit.MockitoJUnitRunner; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.utils.EventDeduplicationExecutor; |
|||
|
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.function.Consumer; |
|||
|
|||
@Slf4j |
|||
@RunWith(MockitoJUnitRunner.class) |
|||
public class EventDeduplicationExecutorTest { |
|||
|
|||
ThingsBoardThreadFactory threadFactory = ThingsBoardThreadFactory.forName(getClass().getSimpleName()); |
|||
ExecutorService executor; |
|||
|
|||
@After |
|||
public void tearDown() throws Exception { |
|||
if (executor != null) { |
|||
executor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testSimpleFlowSameThread() throws InterruptedException { |
|||
simpleFlow(MoreExecutors.newDirectExecutorService()); |
|||
} |
|||
|
|||
@Test |
|||
public void testPeriodicFlowSameThread() throws InterruptedException { |
|||
periodicFlow(MoreExecutors.newDirectExecutorService()); |
|||
} |
|||
|
|||
@Test |
|||
public void testExceptionFlowSameThread() throws InterruptedException { |
|||
exceptionFlow(MoreExecutors.newDirectExecutorService()); |
|||
} |
|||
|
|||
@Test |
|||
public void testSimpleFlowSingleThread() throws InterruptedException { |
|||
executor = Executors.newSingleThreadExecutor(threadFactory); |
|||
simpleFlow(executor); |
|||
} |
|||
|
|||
@Test |
|||
public void testPeriodicFlowSingleThread() throws InterruptedException { |
|||
executor = Executors.newSingleThreadExecutor(threadFactory); |
|||
periodicFlow(executor); |
|||
} |
|||
|
|||
@Test |
|||
public void testExceptionFlowSingleThread() throws InterruptedException { |
|||
executor = Executors.newSingleThreadExecutor(threadFactory); |
|||
exceptionFlow(executor); |
|||
} |
|||
|
|||
@Test |
|||
public void testSimpleFlowMultiThread() throws InterruptedException { |
|||
executor = Executors.newFixedThreadPool(3, threadFactory); |
|||
simpleFlow(executor); |
|||
} |
|||
|
|||
@Test |
|||
public void testPeriodicFlowMultiThread() throws InterruptedException { |
|||
executor = Executors.newFixedThreadPool(3, threadFactory); |
|||
periodicFlow(executor); |
|||
} |
|||
|
|||
@Test |
|||
public void testExceptionFlowMultiThread() throws InterruptedException { |
|||
executor = Executors.newFixedThreadPool(3, threadFactory); |
|||
exceptionFlow(executor); |
|||
} |
|||
|
|||
private void simpleFlow(ExecutorService executorService) throws InterruptedException { |
|||
try { |
|||
Consumer<String> function = Mockito.spy(StringConsumer.class); |
|||
EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function); |
|||
|
|||
String params1 = "params1"; |
|||
String params2 = "params2"; |
|||
String params3 = "params3"; |
|||
|
|||
executor.submit(params1); |
|||
executor.submit(params2); |
|||
executor.submit(params3); |
|||
Thread.sleep(500); |
|||
Mockito.verify(function).accept(params1); |
|||
Mockito.verify(function).accept(params3); |
|||
} finally { |
|||
executorService.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
private void periodicFlow(ExecutorService executorService) throws InterruptedException { |
|||
try { |
|||
Consumer<String> function = Mockito.spy(StringConsumer.class); |
|||
EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function); |
|||
|
|||
String params1 = "params1"; |
|||
String params2 = "params2"; |
|||
String params3 = "params3"; |
|||
|
|||
executor.submit(params1); |
|||
Thread.sleep(500); |
|||
executor.submit(params2); |
|||
Thread.sleep(500); |
|||
executor.submit(params3); |
|||
Thread.sleep(500); |
|||
Mockito.verify(function).accept(params1); |
|||
Mockito.verify(function).accept(params2); |
|||
Mockito.verify(function).accept(params3); |
|||
} finally { |
|||
executorService.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
private void exceptionFlow(ExecutorService executorService) throws InterruptedException { |
|||
try { |
|||
Consumer<String> function = Mockito.spy(StringConsumer.class); |
|||
EventDeduplicationExecutor<String> executor = new EventDeduplicationExecutor<>(EventDeduplicationExecutorTest.class.getSimpleName(), executorService, function); |
|||
|
|||
String params1 = "params1"; |
|||
String params2 = "params2"; |
|||
String params3 = "params3"; |
|||
|
|||
Mockito.doThrow(new RuntimeException()).when(function).accept("params1"); |
|||
executor.submit(params1); |
|||
executor.submit(params2); |
|||
Thread.sleep(500); |
|||
executor.submit(params3); |
|||
Thread.sleep(500); |
|||
Mockito.verify(function).accept(params2); |
|||
Mockito.verify(function).accept(params3); |
|||
} finally { |
|||
executorService.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
public static class StringConsumer implements Consumer<String> { |
|||
@Override |
|||
public void accept(String s) { |
|||
try { |
|||
Thread.sleep(100); |
|||
} catch (InterruptedException e) { |
|||
throw new RuntimeException(e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,85 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.alarm; |
|||
|
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.alarm.Alarm; |
|||
import org.thingsboard.server.common.data.alarm.AlarmInfo; |
|||
import org.thingsboard.server.common.data.alarm.AlarmSeverity; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
|
|||
import java.util.List; |
|||
|
|||
|
|||
@Data |
|||
public class AlarmApiCallResult { |
|||
|
|||
private final boolean successful; |
|||
private final boolean created; |
|||
private final boolean modified; |
|||
private final boolean cleared; |
|||
private final AlarmInfo alarm; |
|||
private final Alarm old; |
|||
private final List<EntityId> propagatedEntitiesList; |
|||
|
|||
@Builder |
|||
private AlarmApiCallResult(boolean successful, boolean created, boolean modified, boolean cleared, AlarmInfo alarm, Alarm old, List<EntityId> propagatedEntitiesList) { |
|||
this.successful = successful; |
|||
this.created = created; |
|||
this.modified = modified; |
|||
this.cleared = cleared; |
|||
this.alarm = alarm; |
|||
this.old = old; |
|||
this.propagatedEntitiesList = propagatedEntitiesList; |
|||
} |
|||
|
|||
public AlarmApiCallResult(AlarmApiCallResult other, List<EntityId> propagatedEntitiesList) { |
|||
this.successful = other.successful; |
|||
this.created = other.created; |
|||
this.modified = other.modified; |
|||
this.cleared = other.cleared; |
|||
this.alarm = other.alarm; |
|||
this.old = other.old; |
|||
this.propagatedEntitiesList = propagatedEntitiesList; |
|||
} |
|||
|
|||
public boolean isSeverityChanged() { |
|||
if (alarm == null || old == null) { |
|||
return false; |
|||
} else { |
|||
return !alarm.getSeverity().equals(old.getSeverity()); |
|||
} |
|||
} |
|||
|
|||
public AlarmSeverity getOldSeverity() { |
|||
return isSeverityChanged() ? old.getSeverity() : null; |
|||
} |
|||
|
|||
public boolean isPropagationChanged() { |
|||
if (created) { |
|||
return true; |
|||
} |
|||
if (alarm == null || old == null) { |
|||
return false; |
|||
} |
|||
return (alarm.isPropagate() != old.isPropagate()) || |
|||
(alarm.isPropagateToOwner() != old.isPropagateToOwner()) || |
|||
(alarm.isPropagateToTenant() != old.isPropagateToTenant()) || |
|||
(!alarm.getPropagateRelationTypes().equals(old.getPropagateRelationTypes())); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,87 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.alarm; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import io.swagger.annotations.ApiModelProperty; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.UserId; |
|||
import org.thingsboard.server.common.data.validation.Length; |
|||
import org.thingsboard.server.common.data.validation.NoXss; |
|||
|
|||
import javax.validation.Valid; |
|||
import javax.validation.constraints.NotNull; |
|||
|
|||
@Data |
|||
@Builder |
|||
public class AlarmCreateOrUpdateActiveRequest implements AlarmModificationRequest { |
|||
|
|||
@NotNull |
|||
@ApiModelProperty(position = 1, value = "JSON object with Tenant Id", accessMode = ApiModelProperty.AccessMode.READ_ONLY) |
|||
private TenantId tenantId; |
|||
@ApiModelProperty(position = 2, value = "JSON object with Customer Id", accessMode = ApiModelProperty.AccessMode.READ_ONLY) |
|||
private CustomerId customerId; |
|||
@NotNull |
|||
@ApiModelProperty(position = 3, required = true, value = "representing type of the Alarm", example = "High Temperature Alarm") |
|||
@Length(fieldName = "type") |
|||
private String type; |
|||
@NotNull |
|||
@ApiModelProperty(position = 4, required = true, value = "JSON object with alarm originator id") |
|||
private EntityId originator; |
|||
@NotNull |
|||
@ApiModelProperty(position = 5, required = true, value = "Alarm severity", example = "CRITICAL") |
|||
private AlarmSeverity severity; |
|||
@ApiModelProperty(position = 6, value = "Timestamp of the alarm start time, in milliseconds", example = "1634058704565") |
|||
private long startTs; |
|||
@ApiModelProperty(position = 7, value = "Timestamp of the alarm end time(last time update), in milliseconds", example = "1634111163522") |
|||
private long endTs; |
|||
@NoXss |
|||
@ApiModelProperty(position = 8, value = "JSON object with alarm details") |
|||
private JsonNode details; |
|||
@Valid |
|||
@ApiModelProperty(position = 9, value = "JSON object with propagation details") |
|||
private AlarmPropagationInfo propagation; |
|||
|
|||
private UserId userId; |
|||
|
|||
public static AlarmCreateOrUpdateActiveRequest fromAlarm(Alarm a) { |
|||
return fromAlarm(a, null); |
|||
} |
|||
|
|||
public static AlarmCreateOrUpdateActiveRequest fromAlarm(Alarm a, UserId userId) { |
|||
return AlarmCreateOrUpdateActiveRequest.builder() |
|||
.tenantId(a.getTenantId()) |
|||
.customerId(a.getCustomerId()) |
|||
.type(a.getType()) |
|||
.originator(a.getOriginator()) |
|||
.severity((a.getSeverity())) |
|||
.startTs(a.getStartTs()) |
|||
.endTs(a.getEndTs()) |
|||
.details(a.getDetails()) |
|||
.propagation(AlarmPropagationInfo.builder() |
|||
.propagate(a.isPropagate()) |
|||
.propagateToOwner(a.isPropagateToOwner()) |
|||
.propagateToTenant(a.isPropagateToTenant()) |
|||
.propagateRelationTypes(a.getPropagateRelationTypes()).build()) |
|||
.userId(userId) |
|||
.build(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.alarm; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.UserId; |
|||
|
|||
public interface AlarmModificationRequest { |
|||
|
|||
TenantId getTenantId(); |
|||
|
|||
long getStartTs(); |
|||
|
|||
long getEndTs(); |
|||
|
|||
void setStartTs(long startTs); |
|||
|
|||
void setEndTs(long endTs); |
|||
|
|||
UserId getUserId(); |
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.alarm; |
|||
|
|||
import io.swagger.annotations.ApiModelProperty; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
import org.thingsboard.server.common.data.validation.NoXss; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
|
|||
@Builder |
|||
@Data |
|||
public class AlarmPropagationInfo { |
|||
|
|||
public static AlarmPropagationInfo EMPTY = new AlarmPropagationInfo(false, false, false, Collections.emptyList()); |
|||
|
|||
@ApiModelProperty(position = 1, value = "Propagation flag to specify if alarm should be propagated to parent entities of alarm originator", example = "true") |
|||
private boolean propagate; |
|||
@ApiModelProperty(position = 2, value = "Propagation flag to specify if alarm should be propagated to the owner (tenant or customer) of alarm originator", example = "true") |
|||
private boolean propagateToOwner; |
|||
@ApiModelProperty(position = 3, value = "Propagation flag to specify if alarm should be propagated to the tenant entity", example = "true") |
|||
private boolean propagateToTenant; |
|||
@NoXss |
|||
@ApiModelProperty(position = 4, value = "JSON array of relation types that should be used for propagation. " + |
|||
"By default, 'propagateRelationTypes' array is empty which means that the alarm will be propagated based on any relation type to parent entities. " + |
|||
"This parameter should be used only in case when 'propagate' parameter is set to true, otherwise, 'propagateRelationTypes' array will be ignored.") |
|||
private List<String> propagateRelationTypes; |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue