Browse Source

Merge branch 'master' of github.com:thingsboard/thingsboard

pull/5345/head
Igor Kulikov 5 years ago
parent
commit
b0e3547b83
  1. 2
      application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
  2. 4
      application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java
  3. 28
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java
  4. 13
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java
  5. 29
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java
  6. 16
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java
  7. 12
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java
  8. 16
      application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java
  9. 25
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java
  10. 17
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
  11. 25
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java
  12. 30
      application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java
  13. 12
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java
  14. 17
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java
  15. 91
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java
  16. 12
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java
  17. 26
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java
  18. 126
      application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java
  19. 55
      common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java
  20. 16
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java
  21. 14
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java
  22. 5
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java
  23. 8
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java
  24. 62
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mVersionedModelProvider.java
  25. 8
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java
  26. 5
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java
  27. 16
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java
  28. 31
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java
  29. 4
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/LwM2mUplinkMsgHandler.java
  30. 252
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  31. 56
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
  32. 14
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
  33. 67
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java
  34. 4
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
  35. 14
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java
  36. 4
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  37. 15
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
  38. 2
      tools/src/main/shell/client.keygen.sh
  39. 12
      tools/src/main/shell/keygen.properties
  40. 2
      tools/src/main/shell/server.keygen.sh
  41. 2
      ui-ngx/src/app/shared/components/button/toggle-password.component.html
  42. 3
      ui-ngx/src/app/shared/components/value-input.component.ts

2
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java

@ -244,7 +244,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(status);
rpc.setAdditionalInfo(JacksonUtil.valueToTree(request.getAdditionalInfo()));
rpc.setAdditionalInfo(JacksonUtil.toJsonNode(request.getAdditionalInfo()));
return systemContext.getTbRpcService().save(tenantId, rpc);
}

4
application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java

@ -177,7 +177,9 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
alarm.getLatest().computeIfAbsent(keyType, tmp -> new HashMap<>()).putAll(latestUpdate);
return alarm;
}).collect(Collectors.toList());
wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements()));
if (!update.isEmpty()) {
wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements()));
}
} else {
log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
}

28
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestIntegrationTest.java

@ -52,7 +52,17 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
@Test
public void testRequestAttributesValuesFromTheServer() throws Exception {
processTestRequestAttributesValuesFromTheServer();
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception {
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception {
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);
}
@Test
@ -60,18 +70,18 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
processTestGatewayRequestAttributesValuesFromTheServer();
}
protected void processTestRequestAttributesValuesFromTheServer() throws Exception {
protected void processTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
postAttributesAndSubscribeToTopic(savedDevice, client);
postAttributesAndSubscribeToTopic(savedDevice, client, attrPubTopic, attrSubTopic);
Thread.sleep(5000);
TestMqttCallback callback = getTestMqttCallback();
client.setCallback(callback);
validateResponse(client, callback.getLatch(), callback);
validateResponse(client, callback.getLatch(), callback, attrReqTopicPrefix);
}
protected void processTestGatewayRequestAttributesValuesFromTheServer() throws Exception {
@ -103,10 +113,10 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
validateSharedResponseGateway(client, sharedAttributesCallback);
}
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client) throws Exception {
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception {
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
client.publish(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.publish(attrPubTopic, new MqttMessage(POST_ATTRIBUTES_PAYLOAD.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value()).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
}
protected void postGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception {
@ -114,12 +124,12 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(postClientAttributes.getBytes())).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
}
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopicPrefix) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
String payloadStr = "{\"clientKeys\":\"" + keys + "\", \"sharedKeys\":\"" + keys + "\"}";
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payloadStr.getBytes());
client.publish(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
client.publish(attrReqTopicPrefix + "1", mqttMessage).waitForCompletion(TimeUnit.MINUTES.toMillis(1));
latch.await(1, TimeUnit.MINUTES);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
String expectedRequestPayload = "{\"client\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}},\"shared\":{\"attribute1\":\"value1\",\"attribute2\":true,\"attribute3\":42.0,\"attribute4\":73,\"attribute5\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}}";

13
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestJsonIntegrationTest.java

@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
@Slf4j
public abstract class AbstractMqttAttributesRequestJsonIntegrationTest extends AbstractMqttAttributesRequestIntegrationTest {
@ -36,7 +37,17 @@ public abstract class AbstractMqttAttributesRequestJsonIntegrationTest extends A
@Test
public void testRequestAttributesValuesFromTheServer() throws Exception {
processTestRequestAttributesValuesFromTheServer();
super.testRequestAttributesValuesFromTheServer();
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception {
super.testRequestAttributesValuesFromTheServerOnShortTopic();
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception {
super.testRequestAttributesValuesFromTheServerOnShortJsonTopic();
}
@Test

29
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/request/AbstractMqttAttributesRequestProtoIntegrationTest.java

@ -84,7 +84,21 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
public void testRequestAttributesValuesFromTheServer() throws Exception {
super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
processTestRequestAttributesValuesFromTheServer();
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortTopic() throws Exception {
super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
}
@Test
public void testRequestAttributesValuesFromTheServerOnShortProtoTopic() throws Exception {
super.processBeforeTest("Test Request attribute values from the server proto", "Gateway Test Request attribute values from the server proto",
TransportPayloadType.PROTOBUF, null, null, null, ATTRIBUTES_SCHEMA_STR, null, null, null, null, DeviceProfileProvisionType.DISABLED);
processTestRequestAttributesValuesFromTheServer(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);
}
@Test
@ -93,7 +107,10 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
processTestGatewayRequestAttributesValuesFromTheServer();
}
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client) throws Exception {
@Test
public void testRequestAttributesValuesFromTheServerOnShortJsonTopic() throws Exception { }
protected void postAttributesAndSubscribeToTopic(Device savedDevice, MqttAsyncClient client, String attrPubTopic, String attrSubTopic) throws Exception {
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", AbstractMqttAttributesIntegrationTest.POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
@ -131,8 +148,8 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
.setField(postAttributesMsgDescriptor.findFieldByName("attribute5"), jsonObject)
.build();
byte[] payload = postAttributesMsg.toByteArray();
client.publish(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, new MqttMessage(payload));
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.publish(attrPubTopic, new MqttMessage(payload));
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
}
protected void postGatewayDeviceClientAttributes(MqttAsyncClient client) throws Exception {
@ -149,7 +166,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
client.publish(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, new MqttMessage(bytes));
}
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, AbstractMqttAttributesIntegrationTest.TestMqttCallback callback) throws MqttException, InterruptedException, InvalidProtocolBufferException {
protected void validateResponse(MqttAsyncClient client, CountDownLatch latch, TestMqttCallback callback, String attrReqTopic) throws MqttException, InterruptedException, InvalidProtocolBufferException {
String keys = "attribute1,attribute2,attribute3,attribute4,attribute5";
TransportApiProtos.AttributesRequest.Builder attributesRequestBuilder = TransportApiProtos.AttributesRequest.newBuilder();
attributesRequestBuilder.setClientKeys(keys);
@ -157,7 +174,7 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
TransportApiProtos.AttributesRequest attributesRequest = attributesRequestBuilder.build();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(attributesRequest.toByteArray());
client.publish(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX + "1", mqttMessage);
client.publish(attrReqTopic + "1", mqttMessage);
latch.await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
TransportProtos.GetAttributeResponseMsg expectedAttributesResponse = getExpectedAttributeResponseMsg();

16
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesIntegrationTest.java

@ -61,7 +61,17 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC);
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
}
@Test
@ -69,14 +79,14 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
processGatewayTestSubscribeToAttributesUpdates();
}
protected void processTestSubscribeToAttributesUpdates() throws Exception {
protected void processTestSubscribeToAttributesUpdates(String attrSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttCallback onUpdateCallback = getTestMqttCallback();
client.setCallback(onUpdateCallback);
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.subscribe(attrSubTopic, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(1000);

12
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesJsonIntegrationTest.java

@ -36,7 +36,17 @@ public abstract class AbstractMqttAttributesUpdatesJsonIntegrationTest extends A
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
super.testSubscribeToAttributesUpdatesFromTheServer();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
super.testSubscribeToAttributesUpdatesFromTheServerOnShortTopic();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
super.testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic();
}
@Test

16
application/src/test/java/org/thingsboard/server/transport/mqtt/attributes/updates/AbstractMqttAttributesUpdatesProtoIntegrationTest.java

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -46,7 +47,20 @@ public abstract class AbstractMqttAttributesUpdatesProtoIntegrationTest extends
@Test
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
super.testSubscribeToAttributesUpdatesFromTheServer();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
super.testSubscribeToAttributesUpdatesFromTheServerOnShortTopic();
}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {}
@Test
public void testSubscribeToAttributesUpdatesFromTheServerOnShortProtoTopic() throws Exception {
processTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC);
}
@Test

25
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcDefaultIntegrationTest.java

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.service.security.AccessValidator;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -81,12 +82,32 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab
@Test
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test

17
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java

@ -60,14 +60,14 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
asyncContextTimeoutToUseRpcPlugin = 10000L;
}
protected void processOneWayRpcTest() throws Exception {
protected void processOneWayRpcTest(String rpcSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
client.setCallback(callback);
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE.value());
client.subscribe(rpcSubTopic, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(1000);
@ -86,9 +86,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
validateOneWayRpcGatewayResponse(deviceName, client, payloadBytes);
}
protected void processTwoWayRpcTest() throws Exception {
protected void processTwoWayRpcTest(String rpcSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
client.subscribe(rpcSubTopic, 1);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
@ -199,7 +199,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException {
MqttMessage message = new MqttMessage();
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) {
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) {
message.setPayload(DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8));
} else {
JsonNode requestMsgNode = JacksonUtil.toJsonNode(new String(mqttMessage.getPayload(), StandardCharset.UTF_8));
@ -232,7 +232,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
String responseTopic = requestTopic.replace("request", "response");
String responseTopic;
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) {
responseTopic = requestTopic.replace("req", "res");
} else {
responseTopic = requestTopic.replace("request", "response");
}
qoS = mqttMessage.getQos();
client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage));
latch.countDown();

25
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcJsonIntegrationTest.java

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
@Slf4j
public abstract class AbstractMqttServerSideRpcJsonIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
@ -37,12 +38,32 @@ public abstract class AbstractMqttServerSideRpcJsonIntegrationTest extends Abstr
@Test
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortJsonTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortJsonTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC);
}
@Test

30
application/src/test/java/org/thingsboard/server/transport/mqtt/rpc/AbstractMqttServerSideRpcProtoIntegrationTest.java

@ -78,12 +78,32 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
@Test
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttOneWayRpcOnShortProtoTopic() throws Exception {
processOneWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC);
}
@Test
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC);
}
@Test
public void testServerMqttTwoWayRpcOnShortProtoTopic() throws Exception {
processTwoWayRpcTest(MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC);
}
@Test
@ -118,9 +138,9 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
return builder.build();
}
protected void processTwoWayRpcTest() throws Exception {
protected void processTwoWayRpcTest(String rpcSubTopic) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
client.subscribe(rpcSubTopic, 1);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
@ -139,7 +159,7 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
protected MqttMessage processMessageArrived(String requestTopic, MqttMessage mqttMessage) throws MqttException, InvalidProtocolBufferException {
MqttMessage message = new MqttMessage();
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC)) {
if (requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC) || requestTopic.startsWith(MqttTopics.BASE_DEVICE_API_TOPIC_V2)) {
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration();
ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA);
DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA);

12
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesIntegrationTest.java

@ -60,6 +60,18 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

17
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesJsonIntegrationTest.java

@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import java.util.Arrays;
import java.util.List;
@ -45,12 +46,18 @@ public abstract class AbstractMqttAttributesJsonIntegrationTest extends Abstract
processJsonPayloadAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortTopic() throws Exception {
super.testPushAttributesOnShortTopic();
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
super.testPushAttributesOnShortJsonTopic();
}
@Test
public void testPushAttributesGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
String payload = getGatewayAttributesJsonPayload(deviceName1, deviceName2);
processGatewayAttributesTest(expectedKeys, payload.getBytes(), deviceName1, deviceName2);
super.testPushAttributesGateway();
}
}

91
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/attributes/AbstractMqttAttributesProtoIntegrationTest.java

@ -25,6 +25,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.gen.transport.TransportApiProtos;
@ -49,14 +50,14 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
@Test
public void testPushAttributes() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DynamicSchema attributesSchema = getDynamicSchema();
DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -67,7 +68,6 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
@ -78,18 +78,50 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), true)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4)
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), false)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 0.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 0)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), true);
}
@Test
public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception {
public void testPushAttributesOnShortTopic() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortProtoTopic() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesGateway() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null);
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
TransportApiProtos.AttributesMsg firstDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName1, expectedKeys);
TransportApiProtos.AttributesMsg secondDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName2, expectedKeys);
gatewayAttributesMsgProtoBuilder.addAllMsg(Arrays.asList(firstDeviceAttributesMsgProto, secondDeviceAttributesMsgProto));
TransportApiProtos.GatewayAttributesMsg gatewayAttributesMsg = gatewayAttributesMsgProtoBuilder.build();
processGatewayAttributesTest(expectedKeys, gatewayAttributesMsg.toByteArray(), deviceName1, deviceName2);
}
private DynamicSchema getDynamicSchema() {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
@ -97,7 +129,11 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
return protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
}
private DynamicMessage getDefaultDynamicMessage() {
DynamicSchema attributesSchema = getDynamicSchema();
DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -108,6 +144,7 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
@ -117,25 +154,13 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
DynamicMessage.Builder postAttributesBuilder = attributesSchema.newMessageBuilder("PostAttributes");
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "")
return postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), true)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key5"), postAttributesMsg.toByteArray(), true);
}
@Test
public void testPushAttributesGateway() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null);
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
TransportApiProtos.AttributesMsg firstDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName1, expectedKeys);
TransportApiProtos.AttributesMsg secondDeviceAttributesMsgProto = getDeviceAttributesMsgProto(deviceName2, expectedKeys);
gatewayAttributesMsgProtoBuilder.addAllMsg(Arrays.asList(firstDeviceAttributesMsgProto, secondDeviceAttributesMsgProto));
TransportApiProtos.GatewayAttributesMsg gatewayAttributesMsg = gatewayAttributesMsgProtoBuilder.build();
processGatewayAttributesTest(expectedKeys, gatewayAttributesMsg.toByteArray(), deviceName1, deviceName2);
}
private TransportApiProtos.AttributesMsg getDeviceAttributesMsgProto(String deviceName, List<String> expectedKeys) {

12
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java

@ -73,6 +73,18 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryOnShortJsonTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

26
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java

@ -57,25 +57,23 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
processJsonPayloadTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
super.testPushTelemetryOnShortTopic();
}
@Test
public void testPushTelemetryWithTsOnShortJsonTopic() throws Exception {
super.testPushTelemetryOnShortJsonTopic();
}
@Test
public void testPushTelemetryGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
String payload = getGatewayTelemetryJsonPayload(deviceName1, deviceName2, "10000", "20000");
processGatewayTelemetryTest(MqttTopics.GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
super.testPushTelemetryGateway();
}
@Test
public void testGatewayConnect() throws Exception {
String payload = "{\"device\":\"Device A\", \"type\": \"" + TransportPayloadType.JSON.name() + "\"}";
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
String deviceName = "Device A";
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(device);
super.testGatewayConnect();
}
}

126
application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java

@ -21,6 +21,7 @@ import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@ -55,41 +56,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
@Test
public void testPushTelemetry() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
DynamicMessage postTelemetryMsg = postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postTelemetryMsgDescriptor.findFieldByName("key2"), true)
.setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@ -121,14 +88,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicSchema telemetrySchema = getDynamicSchema(schemaStr);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -173,14 +133,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
@Test
public void testPushTelemetryWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicSchema telemetrySchema = getDynamicSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -237,14 +190,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicSchema telemetrySchema = getDynamicSchema(schemaStr);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
@ -281,6 +227,26 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), true, true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
public void testPushTelemetryOnShortJsonTopic() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryOnShortProtoTopic() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
public void testPushTelemetryGateway() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
@ -310,6 +276,48 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
assertNotNull(device);
}
private DynamicSchema getDynamicSchema(String deviceTelemetryProtoSchema) {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(deviceTelemetryProtoSchema);
return protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
}
private DynamicMessage getDefaultDynamicMessage() {
DynamicSchema telemetrySchema = getDynamicSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNumber"), 42)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
return postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "value1")
.setField(postTelemetryMsgDescriptor.findFieldByName("key2"), true)
.setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 3.0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
}
private TransportApiProtos.ConnectMsg getConnectProto(String deviceName) {
TransportApiProtos.ConnectMsg.Builder builder = TransportApiProtos.ConnectMsg.newBuilder();
builder.setDeviceName(deviceName);

55
common/data/src/main/java/org/thingsboard/server/common/data/device/profile/MqttTopics.java

@ -34,20 +34,25 @@ public class MqttTopics {
private static final String SOFTWARE = "/sw";
private static final String CHUNK = "/chunk/";
private static final String ERROR = "/error";
private static final String TELEMETRY_SHORT = "/t";
private static final String ATTRIBUTES_SHORT = "/a";
private static final String RPC_SHORT = "/r";
private static final String REQUEST_SHORT = "/req";
private static final String RESPONSE_SHORT = "/res";
private static final String JSON_SHORT = "j";
private static final String PROTO_SHORT = "p";
private static final String ATTRIBUTES_RESPONSE = ATTRIBUTES + RESPONSE;
private static final String ATTRIBUTES_REQUEST = ATTRIBUTES + REQUEST;
private static final String ATTRIBUTES_RESPONSE_SHORT = ATTRIBUTES_SHORT + RESPONSE_SHORT + "/";
private static final String ATTRIBUTES_REQUEST_SHORT = ATTRIBUTES_SHORT + REQUEST_SHORT + "/";
private static final String DEVICE_RPC_RESPONSE = RPC + RESPONSE + "/";
private static final String DEVICE_RPC_REQUEST = RPC + REQUEST + "/";
private static final String DEVICE_RPC_RESPONSE_SHORT = RPC_SHORT + RESPONSE_SHORT + "/";
private static final String DEVICE_RPC_REQUEST_SHORT = RPC_SHORT + REQUEST_SHORT + "/";
private static final String DEVICE_ATTRIBUTES_RESPONSE = ATTRIBUTES_RESPONSE + "/";
private static final String DEVICE_ATTRIBUTES_REQUEST = ATTRIBUTES_REQUEST + "/";
// V1_JSON topics
// v1 topics
public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me";
public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_RESPONSE;
public static final String DEVICE_RPC_RESPONSE_SUB_TOPIC = DEVICE_RPC_RESPONSE_TOPIC + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_TOPIC = BASE_DEVICE_API_TOPIC + DEVICE_RPC_REQUEST;
@ -60,9 +65,7 @@ public class MqttTopics {
public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + ATTRIBUTES;
public static final String DEVICE_PROVISION_REQUEST_TOPIC = PROVISION + REQUEST;
public static final String DEVICE_PROVISION_RESPONSE_TOPIC = PROVISION + RESPONSE;
// V1_JSON gateway topics
// v1 gateway topics
public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway";
public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + CONNECT;
public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + DISCONNECT;
@ -72,22 +75,44 @@ public class MqttTopics {
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC;
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST;
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE;
// v2 topics
public static final String BASE_DEVICE_API_TOPIC_V2 = "v2";
public static final String REQUEST_ID_PATTERN = "(?<requestId>\\d+)";
public static final String CHUNK_PATTERN = "(?<chunk>\\d+)";
public static final String DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN;
public static final String DEVICE_FIRMWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC;
public static final String DEVICE_FIRMWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + FIRMWARE + ERROR;
public static final String DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT = BASE_DEVICE_API_TOPIC_V2 + "/%s" + RESPONSE + "/%s" + CHUNK + "%d";
public static final String DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + REQUEST + "/" + REQUEST_ID_PATTERN + CHUNK + CHUNK_PATTERN;
public static final String DEVICE_SOFTWARE_RESPONSES_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + RESPONSE + "/" + SUB_TOPIC + CHUNK + SUB_TOPIC;
public static final String DEVICE_SOFTWARE_ERROR_TOPIC = BASE_DEVICE_API_TOPIC_V2 + SOFTWARE + ERROR;
public static final String DEVICE_ATTRIBUTES_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT;
public static final String DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT + "/" + JSON_SHORT;
public static final String DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_SHORT + "/" + PROTO_SHORT;
public static final String DEVICE_TELEMETRY_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT;
public static final String DEVICE_TELEMETRY_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT + "/" + JSON_SHORT;
public static final String DEVICE_TELEMETRY_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + TELEMETRY_SHORT + "/" + PROTO_SHORT;
public static final String DEVICE_RPC_RESPONSE_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT;
public static final String DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT + JSON_SHORT + "/";
public static final String DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_RESPONSE_SHORT + PROTO_SHORT + "/";
public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + SUB_TOPIC;
public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + JSON_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC = DEVICE_RPC_RESPONSE_SHORT_TOPIC + PROTO_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_SHORT_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT;
public static final String DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT + JSON_SHORT + "/";
public static final String DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC = BASE_DEVICE_API_TOPIC_V2 + DEVICE_RPC_REQUEST_SHORT + PROTO_SHORT + "/";
public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + JSON_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC = DEVICE_RPC_REQUESTS_SHORT_TOPIC + PROTO_SHORT + "/" + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_RESPONSE_SHORT;
public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + JSON_SHORT + "/";
public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX = DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX + PROTO_SHORT + "/";
public static final String DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX + SUB_TOPIC;
public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC_V2 + ATTRIBUTES_REQUEST_SHORT;
public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX = DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX + JSON_SHORT + "/";
public static final String DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX = DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX + PROTO_SHORT + "/";
private MqttTopics() {
}

16
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java

@ -20,7 +20,6 @@ import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
@ -32,6 +31,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.coap.CoapTransportResource;
@ -44,8 +44,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(UUID sessionId, Request inbound, Descriptors.Descriptor telemetryMsgDescriptor) throws AdaptorException {
ProtoConverter.validateDescriptor(telemetryMsgDescriptor);
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), telemetryMsgDescriptor)));
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), telemetryMsgDescriptor)));
} catch (Exception e) {
throw new AdaptorException(e);
}
@ -53,8 +54,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.PostAttributeMsg convertToPostAttributes(UUID sessionId, Request inbound, Descriptors.Descriptor attributesMsgDescriptor) throws AdaptorException {
ProtoConverter.validateDescriptor(attributesMsgDescriptor);
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), attributesMsgDescriptor)));
return JsonConverter.convertToAttributesProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), attributesMsgDescriptor)));
} catch (Exception e) {
throw new AdaptorException(e);
}
@ -71,8 +73,9 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
if (requestId.isEmpty()) {
throw new AdaptorException("Request id is missing!");
} else {
ProtoConverter.validateDescriptor(rpcResponseMsgDescriptor);
try {
JsonElement response = new JsonParser().parse(dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor));
JsonElement response = new JsonParser().parse(ProtoConverter.dynamicMsgToJson(inbound.getPayload(), rpcResponseMsgDescriptor));
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.orElseThrow(() -> new AdaptorException("Request id is missing!")))
.setPayload(response.toString()).build();
} catch (Exception e) {
@ -158,11 +161,6 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
return response;
}
private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
@Override
public int getContentFormat() {
return MediaTypeRegistry.APPLICATION_OCTET_STREAM;

14
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java

@ -15,24 +15,16 @@
*/
package org.thingsboard.server.transport.lwm2m.config;
import com.google.common.io.Resources;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.ResourceUtils;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyStore;
@Slf4j
@ -40,10 +32,6 @@ import java.security.KeyStore;
@ConditionalOnExpression("('${service.type:null}'=='tb-transport' && '${transport.lwm2m.enabled:false}'=='true') || '${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core'")
public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig {
@Getter
@Setter
private LwM2mModelProvider modelProvider;
@Getter
@Value("${transport.lwm2m.timeout:}")
private Long timeout;
@ -147,6 +135,4 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig {
}
}
}

5
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java

@ -62,14 +62,13 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
private final LwM2mTransportContext context;
private final LwM2MTransportServerConfig config;
private final LwM2mTransportServerHelper helper;
private final OtaPackageDataCache otaPackageDataCache;
private final DefaultLwM2MUplinkMsgHandler handler;
private final CaliforniumRegistrationStore registrationStore;
private final TbSecurityStore securityStore;
private final LwM2mClientContext lwM2mClientContext;
private final TbLwM2MDtlsCertificateVerifier certificateVerifier;
private final TbLwM2MAuthorizer authorizer;
private final LwM2mVersionedModelProvider modelProvider;
private LeshanServer server;
@ -118,8 +117,6 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService {
builder.setCoapConfig(getCoapConfig(config.getPort(), config.getSecurePort(), config));
/* Define model provider (Create Models )*/
LwM2mModelProvider modelProvider = new LwM2mVersionedModelProvider(this.lwM2mClientContext, this.helper, this.context);
config.setModelProvider(modelProvider);
builder.setObjectModelProvider(modelProvider);
/* Set securityStore with new registrationStore */

8
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java

@ -97,15 +97,15 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
}
@Override
public void onResourceUpdate(@NotNull Optional<TransportProtos.ResourceUpdateMsg> resourceUpdateMsgOpt) {
if (ResourceType.LWM2M_MODEL.name().equals(resourceUpdateMsgOpt.get().getResourceType())) {
public void onResourceUpdate(TransportProtos.ResourceUpdateMsg resourceUpdateMsgOpt) {
if (ResourceType.LWM2M_MODEL.name().equals(resourceUpdateMsgOpt.getResourceType())) {
this.handler.onResourceUpdate(resourceUpdateMsgOpt);
}
}
@Override
public void onResourceDelete(@NotNull Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt) {
if (ResourceType.LWM2M_MODEL.name().equals(resourceDeleteMsgOpt.get().getResourceType())) {
public void onResourceDelete(TransportProtos.ResourceDeleteMsg resourceDeleteMsgOpt) {
if (ResourceType.LWM2M_MODEL.name().equals(resourceDeleteMsgOpt.getResourceType())) {
this.handler.onResourceDelete(resourceDeleteMsgOpt);
}
}

62
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mVersionedModelProvider.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.transport.lwm2m.server;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.model.DefaultDDFFileValidator;
import org.eclipse.leshan.core.model.LwM2mModel;
@ -23,8 +22,11 @@ import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.eclipse.leshan.server.registration.Registration;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import java.util.ArrayList;
@ -32,47 +34,58 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.thingsboard.server.common.data.ResourceType.LWM2M_MODEL;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_KEY;
@Slf4j
@RequiredArgsConstructor
@Service
@TbLwM2mTransportComponent
public class LwM2mVersionedModelProvider implements LwM2mModelProvider {
/**
* int objectId
* String version ("1.01")
* Key = objectId + "##" + version
* Value = TenantId
*/
private final LwM2mClientContext lwM2mClientContext;
private final LwM2mTransportServerHelper helper;
private final LwM2mTransportContext context;
private final ConcurrentMap<TenantId, ConcurrentMap<String, ObjectModel>> models;
public LwM2mVersionedModelProvider(@Lazy LwM2mClientContext lwM2mClientContext, LwM2mTransportServerHelper helper, LwM2mTransportContext context) {
this.lwM2mClientContext = lwM2mClientContext;
this.helper = helper;
this.context = context;
this.models = new ConcurrentHashMap<>();
}
private String getKeyIdVer(Integer objectId, String version) {
return objectId != null ? objectId + LWM2M_SEPARATOR_KEY + ((version == null || version.isEmpty()) ? ObjectModel.DEFAULT_VERSION : version) : null;
}
/**
* Update repository if need
*
* @param registration
* @return
*/
@Override
public LwM2mModel getObjectModel(Registration registration) {
return new DynamicModel(registration);
}
private class DynamicModel implements LwM2mModel {
public void evict(TenantId tenantId, String key) {
if (tenantId.isNullUid()) {
models.values().forEach(m -> m.remove(key));
} else {
models.get(tenantId).remove(key);
}
}
private class DynamicModel implements LwM2mModel {
private final Registration registration;
private final TenantId tenantId;
private final Lock modelsLock;
public DynamicModel(Registration registration) {
this.registration = registration;
this.tenantId = lwM2mClientContext.getClientByEndpoint(registration.getEndpoint()).getTenantId();
this.modelsLock = new ReentrantLock();
models.computeIfAbsent(tenantId, t -> new ConcurrentHashMap<>());
}
@Override
@ -114,6 +127,25 @@ public class LwM2mVersionedModelProvider implements LwM2mModelProvider {
private ObjectModel getObjectModelDynamic(Integer objectId, String version) {
String key = getKeyIdVer(objectId, version);
ObjectModel objectModel = models.get(tenantId).get(key);
if (objectModel == null) {
modelsLock.lock();
try {
objectModel = models.get(tenantId).get(key);
if (objectModel == null) {
objectModel = getObjectModel(key);
models.get(tenantId).put(key, objectModel);
}
} finally {
modelsLock.unlock();
}
}
return objectModel;
}
private ObjectModel getObjectModel(String key) {
Optional<TbResource> tbResource = context.getTransportResourceCache().get(this.tenantId, LWM2M_MODEL, key);
return tbResource.map(resource -> helper.parseFromXmlToObjectModel(
Base64.getDecoder().decode(resource.getData()),

8
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java

@ -21,13 +21,11 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mMultipleResource;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.codec.LwM2mValueConverter;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.util.Hex;
import org.eclipse.leshan.server.model.LwM2mModelProvider;
import org.eclipse.leshan.server.registration.Registration;
import org.thingsboard.server.common.data.Device;
@ -44,7 +42,6 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -58,7 +55,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.eclipse.leshan.core.model.ResourceModel.Type.OPAQUE;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LWM2M_OBJECT_VERSION_DEFAULT;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertObjectIdToVersionedId;
@ -281,8 +277,6 @@ public class LwM2mClient implements Serializable {
.getObjectModel(pathIds.getObjectId()) : null;
}
public Collection<LwM2mResource> getNewResourceForInstance(String pathRezIdVer, Object params, LwM2mModelProvider modelProvider,
LwM2mValueConverter converter) {
LwM2mPath pathIds = new LwM2mPath(fromVersionedIdToObjectId(pathRezIdVer));
@ -378,7 +372,7 @@ public class LwM2mClient implements Serializable {
this.lock = new ReentrantLock();
}
public long updateLastUplinkTime(){
public long updateLastUplinkTime() {
this.lastUplinkTime = System.currentTimeMillis();
this.firstEdrxDownlink = true;
return lastUplinkTime;

5
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java

@ -40,6 +40,7 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2mVersion;
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService;
import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore;
@ -74,6 +75,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
private final TbLwM2MClientStore clientStore;
private final LwM2MSessionManager sessionManager;
private final TransportDeviceProfileCache deviceProfileCache;
private final LwM2mVersionedModelProvider modelProvider;
@Autowired
@Lazy
@ -543,8 +545,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
}
private boolean validateResourceInModel(LwM2mClient lwM2mClient, String pathIdVer, boolean isWritableNotOptional) {
ResourceModel resourceModel = lwM2mClient.getResourceModel(pathIdVer, this.config
.getModelProvider());
ResourceModel resourceModel = lwM2mClient.getResourceModel(pathIdVer, modelProvider);
Integer objectId = new LwM2mPath(fromVersionedIdToObjectId(pathIdVer)).getObjectId();
String objectVer = validateObjectVerFromKey(pathIdVer);
return resourceModel != null && (isWritableNotOptional ?

16
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java

@ -60,6 +60,7 @@ import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import org.thingsboard.server.transport.lwm2m.server.common.LwM2MExecutorAwareService;
@ -99,6 +100,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
private final LwM2MTransportServerConfig config;
private final LwM2MTelemetryLogService logService;
private final LwM2mClientContext clientContext;
private final LwM2mVersionedModelProvider modelProvider;
@PostConstruct
public void init() {
@ -124,7 +126,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
@Override
public void sendReadRequest(LwM2mClient client, TbLwM2MReadRequest request, DownlinkRequestCallback<ReadRequest, ReadResponse> callback) {
validateVersionedId(client, request);
ReadRequest downlink = new ReadRequest(getRequestContentFormat(client, request, this.config.getModelProvider()), request.getObjectId());
ReadRequest downlink = new ReadRequest(getRequestContentFormat(client, request, modelProvider), request.getObjectId());
sendSimpleRequest(client, downlink, request.getTimeout(), callback);
}
@ -145,7 +147,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
Set<Observation> observations = context.getServer().getObservationService().getObservations(client.getRegistration());
if (observations.stream().noneMatch(observation -> observation.getPath().equals(resultIds))) {
ObserveRequest downlink;
ContentFormat contentFormat = getRequestContentFormat(client, request, this.config.getModelProvider());
ContentFormat contentFormat = getRequestContentFormat(client, request, modelProvider);
if (resultIds.isResource()) {
downlink = new ObserveRequest(contentFormat, resultIds.getObjectId(), resultIds.getObjectInstanceId(), resultIds.getResourceId());
} else if (resultIds.isObjectInstance()) {
@ -174,7 +176,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
@Override
public void sendExecuteRequest(LwM2mClient client, TbLwM2MExecuteRequest request, DownlinkRequestCallback<ExecuteRequest, ExecuteResponse> callback) {
ResourceModel resourceModelExecute = client.getResourceModel(request.getVersionedId(), this.config.getModelProvider());
ResourceModel resourceModelExecute = client.getResourceModel(request.getVersionedId(), modelProvider);
if (resourceModelExecute != null) {
ExecuteRequest downlink;
if (request.getParams() != null && !resourceModelExecute.multiple) {
@ -231,7 +233,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
@Override
public void sendWriteReplaceRequest(LwM2mClient client, TbLwM2MWriteReplaceRequest request, DownlinkRequestCallback<WriteRequest, WriteResponse> callback) {
ResourceModel resourceModelWrite = client.getResourceModel(request.getVersionedId(), this.config.getModelProvider());
ResourceModel resourceModelWrite = client.getResourceModel(request.getVersionedId(), modelProvider);
if (resourceModelWrite != null) {
ContentFormat contentFormat = convertResourceModelTypeToContentFormat(client, resourceModelWrite.type);
try {
@ -267,8 +269,8 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
* send request: path = '/3/0' node == wM2mObjectInstance
* with params == "\"resources\": {15: resource:{id:15. value:'+01'...}}
**/
Collection<LwM2mResource> resources = client.getNewResourceForInstance(request.getVersionedId(), request.getValue(), this.config.getModelProvider(), this.converter);
ResourceModel resourceModelWrite = client.getResourceModel(request.getVersionedId(), this.config.getModelProvider());
Collection<LwM2mResource> resources = client.getNewResourceForInstance(request.getVersionedId(), request.getValue(), modelProvider, this.converter);
ResourceModel resourceModelWrite = client.getResourceModel(request.getVersionedId(), modelProvider);
ContentFormat contentFormat = request.getObjectContentFormat() != null ? request.getObjectContentFormat() : convertResourceModelTypeToContentFormat(client, resourceModelWrite.type);
WriteRequest downlink = new WriteRequest(WriteRequest.Mode.UPDATE, contentFormat, resultIds.getObjectId(),
resultIds.getObjectInstanceId(), resources);
@ -279,7 +281,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
* int rscId = resultIds.getObjectInstanceId();
* contentFormat Format of the payload (TLV or JSON).
*/
Collection<LwM2mResource> resources = client.getNewResourcesForInstance(request.getVersionedId(), request.getValue(), this.config.getModelProvider(), this.converter);
Collection<LwM2mResource> resources = client.getNewResourcesForInstance(request.getVersionedId(), request.getValue(), modelProvider, this.converter);
if (resources.size() > 0) {
ContentFormat contentFormat = request.getObjectContentFormat() != null ? request.getObjectContentFormat() : ContentFormat.DEFAULT;
WriteRequest downlink = new WriteRequest(WriteRequest.Mode.UPDATE, contentFormat, resultIds.getObjectId(), resultIds.getObjectInstanceId(), resources);

31
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java

@ -46,6 +46,7 @@ import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes;
import org.thingsboard.server.common.data.device.data.lwm2m.OtherConfiguration;
import org.thingsboard.server.common.data.device.data.lwm2m.TelemetryMappingConfiguration;
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
@ -57,6 +58,7 @@ import org.thingsboard.server.transport.lwm2m.server.LwM2mOtaConvert;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
import org.thingsboard.server.transport.lwm2m.server.attributes.LwM2MAttributesService;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientStateException;
@ -139,8 +141,8 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
private final LwM2mTransportServerHelper helper;
private final TbLwM2MDtlsSessionStore sessionStore;
private final LwM2mClientContext clientContext;
private final LwM2MRpcRequestHandler rpcHandler;
private final LwM2mDownlinkMsgHandler defaultLwM2MDownlinkMsgHandler;
private final LwM2mVersionedModelProvider modelProvider;
public DefaultLwM2MUplinkMsgHandler(TransportService transportService,
LwM2MTransportServerConfig config,
@ -150,9 +152,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
LwM2MSessionManager sessionManager,
@Lazy LwM2MOtaUpdateService otaService,
@Lazy LwM2MAttributesService attributesService,
@Lazy LwM2MRpcRequestHandler rpcHandler,
@Lazy LwM2mDownlinkMsgHandler defaultLwM2MDownlinkMsgHandler,
LwM2mTransportContext context, TbLwM2MDtlsSessionStore sessionStore) {
LwM2mTransportContext context,
TbLwM2MDtlsSessionStore sessionStore,
LwM2mVersionedModelProvider modelProvider) {
this.transportService = transportService;
this.sessionManager = sessionManager;
this.attributesService = attributesService;
@ -161,10 +164,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
this.helper = helper;
this.clientContext = clientContext;
this.logService = logService;
this.rpcHandler = rpcHandler;
this.defaultLwM2MDownlinkMsgHandler = defaultLwM2MDownlinkMsgHandler;
this.context = context;
this.sessionStore = sessionStore;
this.modelProvider = modelProvider;
}
@PostConstruct
@ -309,7 +312,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
public void onUpdateValueAfterReadResponse(Registration registration, String path, ReadResponse response) {
if (response.getContent() != null) {
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path, this.config.getModelProvider());
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(path, modelProvider);
if (objectModelVersion != null) {
if (response.getContent() instanceof LwM2mObject) {
LwM2mObject lwM2mObject = (LwM2mObject) response.getContent();
@ -388,15 +391,19 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
}
@Override
public void onResourceUpdate(Optional<TransportProtos.ResourceUpdateMsg> resourceUpdateMsgOpt) {
String idVer = resourceUpdateMsgOpt.get().getResourceKey();
clientContext.getLwM2mClients().forEach(e -> e.updateResourceModel(idVer, this.config.getModelProvider()));
public void onResourceUpdate(TransportProtos.ResourceUpdateMsg resourceUpdateMsgOpt) {
String idVer = resourceUpdateMsgOpt.getResourceKey();
TenantId tenantId = new TenantId(new UUID(resourceUpdateMsgOpt.getTenantIdMSB(), resourceUpdateMsgOpt.getTenantIdLSB()));
modelProvider.evict(tenantId, idVer);
clientContext.getLwM2mClients().forEach(e -> e.updateResourceModel(idVer, modelProvider));
}
@Override
public void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt) {
String pathIdVer = resourceDeleteMsgOpt.get().getResourceKey();
clientContext.getLwM2mClients().forEach(e -> e.deleteResources(pathIdVer, this.config.getModelProvider()));
public void onResourceDelete(TransportProtos.ResourceDeleteMsg resourceDeleteMsgOpt) {
String pathIdVer = resourceDeleteMsgOpt.getResourceKey();
TenantId tenantId = new TenantId(new UUID(resourceDeleteMsgOpt.getTenantIdMSB(), resourceDeleteMsgOpt.getTenantIdLSB()));
modelProvider.evict(tenantId, pathIdVer);
clientContext.getLwM2mClients().forEach(e -> e.deleteResources(pathIdVer, modelProvider));
}
/**
@ -544,7 +551,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
*/
private void updateResourcesValue(LwM2mClient lwM2MClient, LwM2mResource lwM2mResource, String path) {
Registration registration = lwM2MClient.getRegistration();
if (lwM2MClient.saveResourceValue(path, lwM2mResource, this.config.getModelProvider())) {
if (lwM2MClient.saveResourceValue(path, lwM2mResource, modelProvider)) {
if (path.equals(convertObjectIdToVersionedId(FW_NAME_ID, registration))) {
otaService.onCurrentFirmwareNameUpdate(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(FW_3_VER_ID, registration))) {

4
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/LwM2mUplinkMsgHandler.java

@ -48,9 +48,9 @@ public interface LwM2mUplinkMsgHandler {
void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt);
void onResourceUpdate(Optional<TransportProtos.ResourceUpdateMsg> resourceUpdateMsgOpt);
void onResourceUpdate(TransportProtos.ResourceUpdateMsg resourceUpdateMsgOpt);
void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt);
void onResourceDelete(TransportProtos.ResourceDeleteMsg resourceDeleteMsgOpt);
void onAwakeDev(Registration registration);

252
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -101,8 +101,6 @@ import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN;
/**
* @author Andrew Shvayka
@ -110,8 +108,8 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC
@Slf4j
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
@ -133,6 +131,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final ConcurrentHashMap<String, Integer> chunkSizes;
private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
private TopicType attrSubTopicType;
private TopicType rpcSubTopicType;
private TopicType attrReqTopicType;
private TopicType toServerRpcSubTopicType;
MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) {
this.sessionId = UUID.randomUUID();
this.context = context;
@ -355,14 +358,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V1;
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V1;
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
@ -370,6 +375,57 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE);
} else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) {
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V2_JSON;
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V2_PROTO;
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
toServerRpcSubTopicType = TopicType.V2;
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V2_JSON;
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V2_PROTO;
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
attrReqTopicType = TopicType.V2;
} else {
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
ack(ctx, msgId);
@ -541,19 +597,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
registerSubQoS(topic, grantedQoSList, reqQoS);
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC: {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
registerSubQoS(topic, grantedQoSList, reqQoS);
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_JSON);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
processRpcSubscribe(grantedQoSList, topic, reqQoS, TopicType.V2_PROTO);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
@ -580,6 +670,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
private void processRpcSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
rpcSubTopicType = topicType;
registerSubQoS(topic, grantedQoSList, reqQoS);
}
private void processAttributesSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
attrSubTopicType = topicType;
registerSubQoS(topic, grantedQoSList, reqQoS);
}
private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
@ -595,18 +697,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
mqttQoSMap.remove(new MqttTopicMatcher(topicName));
try {
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: {
activityReported = true;
break;
}
}
} catch (Exception e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
@ -837,8 +964,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
log.trace("[{}] Received get attributes response", sessionId);
String topicBase;
MqttTransportAdaptor adaptor;
switch (attrReqTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, response).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
adaptor.convertToPublish(deviceSessionCtx, response, topicBase).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
}
@ -847,8 +995,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to device", sessionId);
log.info("[{}] : attrSubTopicType: {}", notification.toString(), attrSubTopicType);
String topic;
MqttTransportAdaptor adaptor;
switch (attrSubTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
topic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, notification).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
}
@ -862,9 +1031,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
log.info("[{}] Received RPC command to device", sessionId);
String baseTopic;
MqttTransportAdaptor adaptor;
switch (rpcSubTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> {
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
if (isAckExpected(payload)) {
rpcAwaitingAck.put(msgId, rpcRequest);
@ -898,9 +1087,29 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
log.trace("[{}] Received RPC command to server", sessionId);
log.trace("[{}] Received RPC response from server", sessionId);
String baseTopic;
MqttTransportAdaptor adaptor;
switch (toServerRpcSubTopicType) {
case V2:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC;
break;
case V2_JSON:
adaptor = context.getJsonMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
adaptor = context.getProtoMqttAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC;
break;
default:
adaptor = deviceSessionCtx.getPayloadAdaptor();
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
break;
}
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
adaptor.convertToPublish(deviceSessionCtx, rpcResponse, baseTopic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
}
@ -923,4 +1132,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
}
private enum TopicType {
V1, V2, V2_JSON, V2_PROTO
}
}

56
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java

@ -64,6 +64,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode post telemetry request", ex);
throw new AdaptorException(ex);
}
}
@ -74,6 +75,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode post attributes request", ex);
throw new AdaptorException(ex);
}
}
@ -84,6 +86,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try {
return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload);
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode claim device request", ex);
throw new AdaptorException(ex);
}
}
@ -99,33 +102,33 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return processGetAttributeRequestMsg(inbound, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
return processGetAttributeRequestMsg(inbound, topicBase);
}
@Override
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return processToDeviceRpcResponseMsg(inbound, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
return processToDeviceRpcResponseMsg(inbound, topicBase);
}
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return processToServerRpcRequestMsg(ctx, inbound, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
return processToServerRpcRequestMsg(ctx, inbound, topicBase);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
return processConvertFromAttributeResponseMsg(ctx, responseMsg, MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX);
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
return processConvertFromAttributeResponseMsg(ctx, responseMsg, topicBase);
}
@Override
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg)));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
return Optional.of(createMqttPublishMsg(ctx, topic, JsonConverter.toJson(notificationMsg)));
}
@Override
@ -135,8 +138,8 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
}
@Override
@ -145,8 +148,8 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
}
@Override
@ -169,11 +172,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
protected TransportProtos.GetAttributeRequestMsg processGetAttributeRequestMsg(MqttPublishMessage inbound, String topic) throws AdaptorException {
private TransportProtos.GetAttributeRequestMsg processGetAttributeRequestMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
result.setRequestId(getRequestId(topicName, topic));
result.setRequestId(getRequestId(topicName, topicBase));
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
@ -191,49 +194,50 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
protected TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topic) throws AdaptorException {
private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(MqttPublishMessage inbound, String topicBase) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
int requestId = getRequestId(topicName, topic);
int requestId = getRequestId(topicName, topicBase);
String payload = inbound.payload().toString(UTF8);
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
} catch (RuntimeException e) {
log.warn("Failed to decode Rpc response", e);
log.warn("Failed to decode rpc response", e);
throw new AdaptorException(e);
}
}
protected TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topic) throws AdaptorException {
private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
try {
int requestId = getRequestId(topicName, topic);
int requestId = getRequestId(topicName, topicBase);
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
} catch (IllegalStateException | JsonSyntaxException ex) {
log.warn("Failed to decode to server rpc request", ex);
throw new AdaptorException(ex);
}
}
protected Optional<MqttMessage> processConvertFromAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topic) throws AdaptorException {
private Optional<MqttMessage> processConvertFromAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
} else {
int requestId = responseMsg.getRequestId();
if (requestId >= 0) {
return Optional.of(createMqttPublishMsg(ctx,
topic + requestId,
topicBase + requestId,
JsonConverter.toJson(responseMsg)));
}
return Optional.empty();
}
}
protected Optional<MqttMessage> processConvertFromGatewayAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, String topic) throws AdaptorException {
private Optional<MqttMessage> processConvertFromGatewayAttributeResponseMsg(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
} else {
JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, responseMsg);
return Optional.of(createMqttPublishMsg(ctx, topic, result));
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, result));
}
}

14
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java

@ -52,27 +52,27 @@ public interface MqttTransportAdaptor {
PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;

67
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java

@ -20,7 +20,6 @@ import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
@ -49,10 +48,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
Descriptors.Descriptor telemetryDynamicMsgDescriptor = getDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor());
Descriptors.Descriptor telemetryDynamicMsgDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor());
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor)));
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor)));
} catch (Exception e) {
log.warn("Failed to decode post telemetry request", e);
throw new AdaptorException(e);
}
}
@ -61,10 +61,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
Descriptors.Descriptor attributesDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
Descriptors.Descriptor attributesDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor)));
return JsonConverter.convertToAttributesProto(new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, attributesDynamicMessageDescriptor)));
} catch (Exception e) {
log.warn("Failed to decode post attributes request", e);
throw new AdaptorException(e);
}
}
@ -75,16 +76,17 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
try {
return ProtoConverter.convertToClaimDeviceProto(ctx.getDeviceId(), bytes);
} catch (InvalidProtocolBufferException e) {
log.warn("Failed to decode claim device request", e);
throw new AdaptorException(e);
}
}
@Override
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
byte[] bytes = toBytes(inbound.payload());
String topicName = inbound.variableHeader().topicName();
try {
int requestId = getRequestId(topicName, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
int requestId = getRequestId(topicName, topicBase);
return ProtoConverter.convertToGetAttributeRequestMessage(bytes, requestId);
} catch (InvalidProtocolBufferException e) {
log.warn("Failed to decode get attributes request", e);
@ -93,29 +95,30 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
}
@Override
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException {
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
String topicName = mqttMsg.variableHeader().topicName();
byte[] bytes = toBytes(mqttMsg.payload());
Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = getDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor());
Descriptors.Descriptor rpcResponseDynamicMessageDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getRpcResponseDynamicMessageDescriptor());
try {
int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
JsonElement response = new JsonParser().parse(dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor));
int requestId = getRequestId(topicName, topicBase);
JsonElement response = new JsonParser().parse(ProtoConverter.dynamicMsgToJson(bytes, rpcResponseDynamicMessageDescriptor));
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(response.toString()).build();
} catch (Exception e) {
log.warn("Failed to decode Rpc response", e);
log.warn("Failed to decode rpc response", e);
throw new AdaptorException(e);
}
}
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException {
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException {
byte[] bytes = toBytes(mqttMsg.payload());
String topicName = mqttMsg.variableHeader().topicName();
try {
int requestId = getRequestId(topicName, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
int requestId = getRequestId(topicName, topicBase);
return ProtoConverter.convertToServerRpcRequest(bytes, requestId);
} catch (InvalidProtocolBufferException e) {
log.warn("Failed to decode to server rpc request", e);
throw new AdaptorException(e);
}
}
@ -126,40 +129,43 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
try {
return ProtoConverter.convertToProvisionRequestMsg(bytes);
} catch (InvalidProtocolBufferException ex) {
log.warn("Failed to decode provision request", ex);
throw new AdaptorException(ex);
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
throw new AdaptorException(responseMsg.getError());
} else {
int requestId = responseMsg.getRequestId();
if (requestId >= 0) {
return Optional.of(createMqttPublishMsg(ctx,
MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
responseMsg.toByteArray()));
return Optional.of(createMqttPublishMsg(ctx, topicBase + requestId, responseMsg.toByteArray()));
}
return Optional.empty();
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
DynamicMessage.Builder rpcRequestDynamicMessageBuilder = deviceSessionCtx.getRpcRequestDynamicMessageBuilder();
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder)));
if (rpcRequestDynamicMessageBuilder == null) {
throw new AdaptorException("Failed to get rpcRequestDynamicMessageBuilder!");
} else {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcRequest.getRequestId(), ProtoConverter.convertToRpcRequest(rpcRequest, rpcRequestDynamicMessageBuilder)));
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), rpcResponse.toByteArray()));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
return Optional.of(createMqttPublishMsg(ctx, topicBase + rpcResponse.getRequestId(), rpcResponse.toByteArray()));
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, notificationMsg.toByteArray()));
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
return Optional.of(createMqttPublishMsg(ctx, topic, notificationMsg.toByteArray()));
}
@Override
@ -213,17 +219,4 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
private int getRequestId(String topicName, String topic) {
return Integer.parseInt(topicName.substring(topic.length()));
}
private Descriptors.Descriptor getDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException {
if (descriptor == null) {
throw new AdaptorException("Failed to get dynamic message descriptor!");
}
return descriptor;
}
private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
}

4
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java

@ -56,7 +56,7 @@ public interface SessionMsgListener {
default void onDeviceDeleted(DeviceId deviceId) {}
default void onResourceUpdate(Optional<TransportProtos.ResourceUpdateMsg> resourceUpdateMsgOpt) {}
default void onResourceUpdate(TransportProtos.ResourceUpdateMsg resourceUpdateMsgOpt) {}
default void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceUpdateMsgOpt) {}
default void onResourceDelete(TransportProtos.ResourceDeleteMsg resourceUpdateMsgOpt) {}
}

14
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java

@ -19,6 +19,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
@ -190,4 +191,17 @@ public class ProtoConverter {
throw new AdaptorException("Failed to convert ToDeviceRpcRequestMsg to Dynamic Rpc request message due to: ", e);
}
}
public static Descriptors.Descriptor validateDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException {
if (descriptor == null) {
throw new AdaptorException("Failed to get dynamic message descriptor!");
}
return descriptor;
}
public static String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
}

4
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -908,7 +908,7 @@ public class DefaultTransportService implements TransportService {
transportResourceCache.update(tenantId, resourceType, resourceId);
sessions.forEach((id, mdRez) -> {
log.warn("ResourceUpdate - [{}] [{}]", id, mdRez);
transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceUpdate(Optional.ofNullable(msg)));
transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceUpdate(msg));
});
} else if (toSessionMsg.hasResourceDeleteMsg()) {
@ -919,7 +919,7 @@ public class DefaultTransportService implements TransportService {
transportResourceCache.evict(tenantId, resourceType, resourceId);
sessions.forEach((id, mdRez) -> {
log.warn("ResourceDelete - [{}] [{}]", id, mdRez);
transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(Optional.ofNullable(msg)));
transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg));
});
} else {
//TODO: should we notify the device actor about missed session?

15
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java

@ -95,15 +95,8 @@ public class TbSendRPCRequestNode implements TbNode {
tmp = msg.getMetaData().getValue(DataConstants.RETRIES);
Integer retries = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : null;
String params;
JsonElement paramsEl = json.get("params");
if (paramsEl.isJsonPrimitive()) {
params = paramsEl.getAsString();
} else {
params = gson.toJson(paramsEl);
}
String additionalInfo = gson.toJson(json.get(DataConstants.ADDITIONAL_INFO));
String params = parseJsonData(json.get("params"));
String additionalInfo = parseJsonData(json.get(DataConstants.ADDITIONAL_INFO));
RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
.oneway(oneway)
@ -144,4 +137,8 @@ public class TbSendRPCRequestNode implements TbNode {
return gson.toJson(json);
}
private String parseJsonData(JsonElement paramsEl) {
return paramsEl.isJsonPrimitive() ? paramsEl.getAsString() : gson.toJson(paramsEl);
}
}

2
tools/src/main/shell/client.keygen.sh

@ -96,7 +96,7 @@ keytool -genkeypair -v \
-keypass $CLIENT_KEY_PASSWORD \
-storepass $CLIENT_KEYSTORE_PASSWORD \
-keyalg $CLIENT_KEY_ALG \
-keysize $CLIENT_KEY_SIZE\
-groupname $CLIENT_KEY_GROUP_NAME \
-validity 9999 \
-dname "CN=$DOMAIN_SUFFIX, OU=$ORGANIZATIONAL_UNIT, O=$ORGANIZATION, L=$CITY, ST=$STATE_OR_PROVINCE, C=$TWO_LETTER_COUNTRY_CODE"

12
tools/src/main/shell/keygen.properties

@ -16,8 +16,8 @@
DOMAIN_SUFFIX="$(hostname)"
SUBJECT_ALTERNATIVE_NAMES="ip:127.0.0.1"
ORGANIZATIONAL_UNIT=Thingsboard
ORGANIZATION=Thingsboard
ORGANIZATIONAL_UNIT=ThingsBoard
ORGANIZATION=ThingsBoard
CITY="San Francisco"
STATE_OR_PROVINCE=CA
TWO_LETTER_COUNTRY_CODE=US
@ -27,8 +27,8 @@ SERVER_KEY_PASSWORD=password
SERVER_KEY_ALIAS="serveralias"
SERVER_FILE_PREFIX="server"
SERVER_KEY_ALG="RSA"
SERVER_KEY_SIZE="2048"
SERVER_KEY_ALG="EC"
SERVER_KEY_GROUP_NAME="secp256r1"
SERVER_KEYSTORE_DIR="/etc/thingsboard/conf"
CLIENT_KEYSTORE_PASSWORD=password
@ -36,5 +36,5 @@ CLIENT_KEY_PASSWORD=password
CLIENT_KEY_ALIAS="clientalias"
CLIENT_FILE_PREFIX="client"
CLIENT_KEY_ALG="RSA"
CLIENT_KEY_SIZE="2048"
CLIENT_KEY_ALG="EC"
CLIENT_KEY_GROUP_NAME="secp256r1"

2
tools/src/main/shell/server.keygen.sh

@ -105,7 +105,7 @@ keytool -genkeypair -v \
-keypass $SERVER_KEY_PASSWORD \
-storepass $SERVER_KEYSTORE_PASSWORD \
-keyalg $SERVER_KEY_ALG \
-keysize $SERVER_KEY_SIZE \
-groupname $SERVER_KEY_GROUP_NAME \
-validity 9999 \
$EXT

2
ui-ngx/src/app/shared/components/button/toggle-password.component.html

@ -15,6 +15,6 @@
limitations under the License.
-->
<button mat-icon-button type="button" (click)="togglePassword($event)" [attr.aria-pressed]="showPassword" *ngIf="!hideToggle">
<button mat-icon-button type="button" tabindex="-1" (click)="togglePassword($event)" [attr.aria-pressed]="showPassword" *ngIf="!hideToggle">
<mat-icon>{{ showPassword ? 'visibility' : 'visibility_off' }}</mat-icon>
</button>

3
ui-ngx/src/app/shared/components/value-input.component.ts

@ -124,8 +124,9 @@ export class ValueInputComponent implements OnInit, ControlValueAccessor {
onValueTypeChanged() {
if (this.valueType === ValueType.BOOLEAN) {
this.modelValue = false;
} if (this.valueType === ValueType.JSON) {
} else if (this.valueType === ValueType.JSON) {
this.modelValue = {};
this.inputForm.form.get('value').patchValue({});
} else {
this.modelValue = null;
}

Loading…
Cancel
Save