|
|
|
@ -46,11 +46,11 @@ import org.thingsboard.server.transport.coap.CoapTestClient; |
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
import static org.awaitility.Awaitility.await; |
|
|
|
import static org.junit.Assert.assertArrayEquals; |
|
|
|
import static org.junit.Assert.assertEquals; |
|
|
|
import static org.junit.Assert.assertNotNull; |
|
|
|
@ -235,25 +235,37 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
CoapTestCallback callbackCoap = new CoapTestCallback(1); |
|
|
|
|
|
|
|
CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); |
|
|
|
callbackCoap.getLatch().await(3, TimeUnit.SECONDS); |
|
|
|
|
|
|
|
String awaitAlias = "await Json Test Subscribe To AttributesUpdates (client.getObserveRelation)"; |
|
|
|
await(awaitAlias) |
|
|
|
.atMost(10, TimeUnit.SECONDS) |
|
|
|
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && |
|
|
|
callbackCoap.getObserve() != null && |
|
|
|
0 == callbackCoap.getObserve().intValue()); |
|
|
|
if (emptyCurrentStateNotification) { |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, "{}", 0); |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, "{}"); |
|
|
|
} else { |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION, 0); |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION); |
|
|
|
} |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
int expectedObserveCnt = callbackCoap.getObserve().intValue() + 1; |
|
|
|
int expectedObserveBeforeAddCnt = callbackCoap.getObserve().intValue() + 1; |
|
|
|
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD, expectedObserveCnt); |
|
|
|
awaitAlias = "await Json Test Subscribe To AttributesUpdates (add attributes)"; |
|
|
|
await(awaitAlias) |
|
|
|
.atMost(10, TimeUnit.SECONDS) |
|
|
|
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && |
|
|
|
callbackCoap.getObserve() != null && |
|
|
|
expectedObserveBeforeAddCnt == callbackCoap.getObserve().intValue()); |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD); |
|
|
|
|
|
|
|
latch = new CountDownLatch(1); |
|
|
|
int expectedObserveBeforeDeleteCnt = callbackCoap.getObserve().intValue() + 1; |
|
|
|
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_DELETED_RESPONSE, expectedObserveBeforeDeleteCnt); |
|
|
|
awaitAlias = "await Json Test Subscribe To AttributesUpdates (deleted attributes)"; |
|
|
|
await(awaitAlias) |
|
|
|
.atMost(10, TimeUnit.SECONDS) |
|
|
|
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && |
|
|
|
callbackCoap.getObserve() != null && |
|
|
|
expectedObserveBeforeDeleteCnt == callbackCoap.getObserve().intValue()); |
|
|
|
validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_DELETED_RESPONSE); |
|
|
|
|
|
|
|
observeRelation.proactiveCancel(); |
|
|
|
assertTrue(observeRelation.isCanceled()); |
|
|
|
@ -269,8 +281,13 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
client = new CoapTestClient(accessToken, FeatureType.ATTRIBUTES); |
|
|
|
CoapTestCallback callbackCoap = new CoapTestCallback(1); |
|
|
|
|
|
|
|
String awaitAlias = "await Proto Test Subscribe To Attributes Updates (add attributes)"; |
|
|
|
CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); |
|
|
|
callbackCoap.getLatch().await(3, TimeUnit.SECONDS); |
|
|
|
await(awaitAlias) |
|
|
|
.atMost(10, TimeUnit.SECONDS) |
|
|
|
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && |
|
|
|
callbackCoap.getObserve() != null && |
|
|
|
0 == callbackCoap.getObserve().intValue()); |
|
|
|
|
|
|
|
if (emptyCurrentStateNotification) { |
|
|
|
validateEmptyCurrentStateAttributesProtoResponse(callbackCoap); |
|
|
|
@ -278,16 +295,24 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
validateCurrentStateAttributesProtoResponse(callbackCoap); |
|
|
|
} |
|
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
int expectedObserveCnt = callbackCoap.getObserve().intValue() + 1; |
|
|
|
int expectedObserveBeforeAddCnt = callbackCoap.getObserve().intValue() + 1; |
|
|
|
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
validateUpdateProtoAttributesResponse(callbackCoap, expectedObserveCnt); |
|
|
|
awaitAlias = "await Proto Test Subscribe To Attributes Updates (add attributes)"; |
|
|
|
await(awaitAlias) |
|
|
|
.atMost(10, TimeUnit.SECONDS) |
|
|
|
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && |
|
|
|
callbackCoap.getObserve() != null && |
|
|
|
expectedObserveBeforeAddCnt == callbackCoap.getObserve().intValue()); |
|
|
|
validateUpdateProtoAttributesResponse(callbackCoap, expectedObserveBeforeAddCnt); |
|
|
|
|
|
|
|
latch = new CountDownLatch(1); |
|
|
|
int expectedObserveBeforeDeleteCnt = callbackCoap.getObserve().intValue() + 1; |
|
|
|
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); |
|
|
|
latch.await(3, TimeUnit.SECONDS); |
|
|
|
awaitAlias = "await Proto Test Subscribe To Attributes Updates (deleted attributes)"; |
|
|
|
await(awaitAlias) |
|
|
|
.atMost(10, TimeUnit.SECONDS) |
|
|
|
.until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && |
|
|
|
callbackCoap.getObserve() != null && |
|
|
|
expectedObserveBeforeDeleteCnt == callbackCoap.getObserve().intValue()); |
|
|
|
validateDeleteProtoAttributesResponse(callbackCoap, expectedObserveBeforeDeleteCnt); |
|
|
|
|
|
|
|
observeRelation.proactiveCancel(); |
|
|
|
@ -314,27 +339,18 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
assertTrue(actualSharedKeyValueProtos.containsAll(expectedSharedKeyValueProtos)); |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateUpdateAttributesJsonResponse(CoapTestCallback callback, String expectedResponse, int expectedObserveCnt) { |
|
|
|
protected void validateUpdateAttributesJsonResponse(CoapTestCallback callback, String expectedResponse) { |
|
|
|
assertNotNull(callback.getPayloadBytes()); |
|
|
|
assertNotNull(callback.getObserve()); |
|
|
|
assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); |
|
|
|
assertEquals(expectedObserveCnt, callback.getObserve().intValue()); |
|
|
|
String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); |
|
|
|
assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.toJsonNode(response)); |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateEmptyCurrentStateAttributesProtoResponse(CoapTestCallback callback) throws InvalidProtocolBufferException { |
|
|
|
assertArrayEquals(EMPTY_PAYLOAD, callback.getPayloadBytes()); |
|
|
|
assertNotNull(callback.getObserve()); |
|
|
|
assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); |
|
|
|
assertEquals(0, callback.getObserve().intValue()); |
|
|
|
} |
|
|
|
|
|
|
|
protected void validateCurrentStateAttributesProtoResponse(CoapTestCallback callback) throws InvalidProtocolBufferException { |
|
|
|
assertNotNull(callback.getPayloadBytes()); |
|
|
|
assertNotNull(callback.getObserve()); |
|
|
|
assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); |
|
|
|
assertEquals(0, callback.getObserve().intValue()); |
|
|
|
TransportProtos.AttributeUpdateNotificationMsg.Builder expectedCurrentStateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); |
|
|
|
TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto("sharedStr", "value", TransportProtos.KeyValueType.STRING_V); |
|
|
|
TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto("sharedBool", "false", TransportProtos.KeyValueType.BOOLEAN_V); |
|
|
|
@ -359,9 +375,6 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
|
|
|
|
protected void validateUpdateProtoAttributesResponse(CoapTestCallback callback, int expectedObserveCnt) throws InvalidProtocolBufferException { |
|
|
|
assertNotNull(callback.getPayloadBytes()); |
|
|
|
assertNotNull(callback.getObserve()); |
|
|
|
assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); |
|
|
|
assertEquals(expectedObserveCnt, callback.getObserve().intValue()); |
|
|
|
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); |
|
|
|
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared"); |
|
|
|
attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); |
|
|
|
@ -378,9 +391,6 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
|
|
|
|
protected void validateDeleteProtoAttributesResponse(CoapTestCallback callback, int expectedObserveCnt) throws InvalidProtocolBufferException { |
|
|
|
assertNotNull(callback.getPayloadBytes()); |
|
|
|
assertNotNull(callback.getObserve()); |
|
|
|
assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); |
|
|
|
assertEquals(expectedObserveCnt, callback.getObserve().intValue()); |
|
|
|
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); |
|
|
|
attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); |
|
|
|
|
|
|
|
@ -395,9 +405,10 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap |
|
|
|
Awaitility.await("awaitClientAfterCancelObserve") |
|
|
|
.pollInterval(10, TimeUnit.MILLISECONDS) |
|
|
|
.atMost(5, TimeUnit.SECONDS) |
|
|
|
.until(()->{ |
|
|
|
.until(() -> { |
|
|
|
log.trace("awaiting defaultTransportService.sessions is empty"); |
|
|
|
return defaultTransportService.sessions.isEmpty();}); |
|
|
|
return defaultTransportService.sessions.isEmpty(); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private TransportProtos.GetAttributeResponseMsg getExpectedAttributeResponseMsg() { |
|
|
|
|