From c82e3d4791223901dbbf86182f7ea4bd0854e164 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 11 Sep 2023 19:30:58 +0300 Subject: [PATCH] Added support for NON CoAP messages --- .../server/transport/coap/CoapTestClient.java | 32 +- .../client/CoapClientIntegrationTest.java | 314 ++++++++++++++++++ .../callback/AbstractSyncSessionCallback.java | 1 + .../coap/callback/CoapOkCallback.java | 4 +- 4 files changed, 346 insertions(+), 5 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/transport/coap/client/CoapClientIntegrationTest.java diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestClient.java b/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestClient.java index 6c5f682a4f..3124571dc8 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestClient.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/CoapTestClient.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.transport.coap; +import lombok.Getter; import org.eclipse.californium.core.CoapClient; import org.eclipse.californium.core.CoapHandler; import org.eclipse.californium.core.CoapObserveRelation; @@ -34,7 +35,10 @@ public class CoapTestClient { private final CoapClient client; - public CoapTestClient(){ + @Getter + private CoAP.Type type = CoAP.Type.CON; + + public CoapTestClient() { this.client = createClient(); } @@ -80,9 +84,13 @@ public class CoapTestClient { return client.setTimeout(CLIENT_REQUEST_TIMEOUT).get(); } - public CoapObserveRelation getObserveRelation(CoapTestCallback callback){ + public CoapObserveRelation getObserveRelation(CoapTestCallback callback) { + return getObserveRelation(callback, true); + } + + public CoapObserveRelation getObserveRelation(CoapTestCallback callback, boolean confirmable) { Request request = Request.newGet().setObserve(); - request.setType(CoAP.Type.CON); + request.setType(confirmable ? CoAP.Type.CON : CoAP.Type.NON); return client.observe(request, callback); } @@ -94,12 +102,28 @@ public class CoapTestClient { } public void setURI(String accessToken, FeatureType featureType) { - if (featureType == null){ + if (featureType == null) { featureType = FeatureType.ATTRIBUTES; } setURI(getFeatureTokenUrl(accessToken, featureType)); } + public void useCONs() { + if (client == null) { + throw new RuntimeException("Failed to connect! CoapClient is not initialized!"); + } + type = CoAP.Type.CON; + client.useCONs(); + } + + public void useNONs() { + if (client == null) { + throw new RuntimeException("Failed to connect! CoapClient is not initialized!"); + } + type = CoAP.Type.NON; + client.useNONs(); + } + private CoapClient createClient() { return new CoapClient(); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/client/CoapClientIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/client/CoapClientIntegrationTest.java new file mode 100644 index 0000000000..3760935d41 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/coap/client/CoapClientIntegrationTest.java @@ -0,0 +1,314 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapHandler; +import org.eclipse.californium.core.CoapObserveRelation; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.SingleEntityFilter; +import org.thingsboard.server.common.msg.session.FeatureType; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest; +import org.thingsboard.server.transport.coap.CoapTestCallback; +import org.thingsboard.server.transport.coap.CoapTestClient; +import org.thingsboard.server.transport.coap.CoapTestConfigProperties; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.query.EntityKeyType.CLIENT_ATTRIBUTE; +import static org.thingsboard.server.common.data.query.EntityKeyType.SHARED_ATTRIBUTE; + +@Slf4j +@DaoSqlTest +public class CoapClientIntegrationTest extends AbstractCoapIntegrationTest { + + private static final String PAYLOAD_VALUES_STR = "{\"key1\":\"value1\", \"key2\":true, \"key3\": 3.0, \"key4\": 4," + + " \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}"; + private static final List EXPECTED_KEYS = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + private static final String DEVICE_RESPONSE = "{\"value1\":\"A\",\"value2\":\"B\"}"; + + + @Before + public void beforeTest() throws Exception { + CoapTestConfigProperties configProperties = CoapTestConfigProperties.builder() + .deviceName("Test Post Attributes device") + .build(); + processBeforeTest(configProperties); + } + + @After + public void afterTest() throws Exception { + processAfterTest(); + } + + @Test + public void testConfirmableRequests() throws Exception { + boolean confirmable = true; + processAttributesTest(confirmable); + processTwoWayRpcTest(confirmable); + processTestRequestAttributesValuesFromTheServer(confirmable); + } + + @Test + public void testNonConfirmableRequests() throws Exception { + boolean confirmable = false; + processAttributesTest(confirmable); + processTwoWayRpcTest(confirmable); + processTestRequestAttributesValuesFromTheServer(confirmable); + } + + protected void processAttributesTest(boolean confirmable) throws Exception { + client = createClientForFeatureWithConfirmableParameter(FeatureType.ATTRIBUTES, confirmable); + CoapResponse coapResponse = client.postMethod(PAYLOAD_VALUES_STR.getBytes()); + assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode()); + validateConfirmableFlag(client, coapResponse); + + DeviceId deviceId = savedDevice.getId(); + List actualKeys = getActualKeysList(deviceId); + assertNotNull(actualKeys); + + Set actualKeySet = new HashSet<>(actualKeys); + Set expectedKeySet = new HashSet<>(EXPECTED_KEYS); + assertEquals(expectedKeySet, actualKeySet); + + String attributesValuesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/attributes/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet); + ; + List> values = doGetAsyncTyped(attributesValuesUrl, new TypeReference<>() { + }); + assertAttributesValues(values, actualKeySet); + String deleteAttributesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet); + doDelete(deleteAttributesUrl); + } + + protected void processTwoWayRpcTest(boolean confirmable) throws Exception { + client = createClientForFeatureWithConfirmableParameter(FeatureType.RPC, confirmable); + CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client); + + CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap, confirmable); + String awaitAlias = "await Two Way Rpc (client.getObserveRelation)"; + await(awaitAlias) + .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + 0 == callbackCoap.getObserve()); + validateCurrentStateNotification(callbackCoap); + + String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; + String deviceId = savedDevice.getId().getId().toString(); + int expectedObserveCountAfterGpioRequest1 = callbackCoap.getObserve() + 1; + String actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first"; + await(awaitAlias) + .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveCountAfterGpioRequest1 == callbackCoap.getObserve()); + validateTwoWayStateChangedNotification(callbackCoap, actualResult); + + int expectedObserveCountAfterGpioRequest2 = callbackCoap.getObserve() + 1; + actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + awaitAlias = "await Two Way Rpc (setGpio(method, params, value) second"; + await(awaitAlias) + .atMost(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveCountAfterGpioRequest2 == callbackCoap.getObserve()); + + validateTwoWayStateChangedNotification(callbackCoap, actualResult); + + observeRelation.proactiveCancel(); + assertTrue(observeRelation.isCanceled()); + } + + protected void processTestRequestAttributesValuesFromTheServer(boolean confirmable) throws Exception { + client = createClientForFeatureWithConfirmableParameter(FeatureType.ATTRIBUTES, confirmable); + SingleEntityFilter dtf = new SingleEntityFilter(); + dtf.setSingleEntity(savedDevice.getId()); + List csKeys = getEntityKeys(CLIENT_ATTRIBUTE); + List shKeys = getEntityKeys(SHARED_ATTRIBUTE); + List keys = new ArrayList<>(); + keys.addAll(csKeys); + keys.addAll(shKeys); + getWsClient().subscribeLatestUpdate(keys, dtf); + getWsClient().registerWaitForUpdate(2); + + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", + PAYLOAD_VALUES_STR, String.class, status().isOk()); + + CoapResponse coapResponse = client.postMethod(PAYLOAD_VALUES_STR); + assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode()); + + String update = getWsClient().waitForUpdate(); + assertThat(update).as("ws update received").isNotBlank(); + + String keysParam = String.join(",", EXPECTED_KEYS); + String featureTokenUrl = CoapTestClient.getFeatureTokenUrl(accessToken, FeatureType.ATTRIBUTES) + "?clientKeys=" + keysParam + "&sharedKeys=" + keysParam; + client.setURI(featureTokenUrl); + CoapResponse response = client.getMethod(); + validateConfirmableFlag(client, response); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + protected void assertAttributesValues(List> deviceValues, Set keySet) { + for (Map map : deviceValues) { + String key = (String) map.get("key"); + Object value = map.get("value"); + assertTrue(keySet.contains(key)); + switch (key) { + case "key1": + assertEquals("value1", value); + break; + case "key2": + assertEquals(true, value); + break; + case "key3": + assertEquals(3.0, value); + break; + case "key4": + assertEquals(4, value); + break; + case "key5": + assertNotNull(value); + assertEquals(3, ((LinkedHashMap) value).size()); + assertEquals(42, ((LinkedHashMap) value).get("someNumber")); + assertEquals(Arrays.asList(1, 2, 3), ((LinkedHashMap) value).get("someArray")); + LinkedHashMap someNestedObject = (LinkedHashMap) ((LinkedHashMap) value).get("someNestedObject"); + assertEquals("value", someNestedObject.get("key")); + break; + } + } + } + + private List getActualKeysList(DeviceId deviceId) throws Exception { + long start = System.currentTimeMillis(); + long end = System.currentTimeMillis() + 5000; + + List actualKeys = null; + while (start <= end) { + actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/attributes/CLIENT_SCOPE", new TypeReference<>() { + }); + if (actualKeys.size() == EXPECTED_KEYS.size()) { + break; + } + Thread.sleep(100); + start += 100; + } + return actualKeys; + } + + private void validateCurrentStateNotification(CoapTestCallback callback) { + assertArrayEquals(EMPTY_PAYLOAD, callback.getPayloadBytes()); + } + + private void validateConfirmableFlag(CoapTestClient client, CoapResponse response) { + CoAP.Type responseType = response.advanced().getType(); + if (CoAP.Type.CON.equals(responseType) || CoAP.Type.NON.equals(responseType)) { + assertEquals(client.getType(), responseType); + } + } + + private void validateTwoWayStateChangedNotification(CoapTestCallback callback, String actualResult) { + assertEquals(DEVICE_RESPONSE, actualResult); + assertNotNull(callback.getPayloadBytes()); + } + + protected class TestCoapCallbackForRPC extends CoapTestCallback { + + private final CoapTestClient client; + + TestCoapCallbackForRPC(CoapTestClient client) { + this.client = client; + } + + @Override + public void onLoad(CoapResponse response) { + payloadBytes = response.getPayload(); + responseCode = response.getCode(); + observe = response.getOptions().getObserve(); + validateConfirmableFlag(client, response); + if (observe != null) { + if (observe > 0) { + processOnLoadResponse(response, client); + } + } + } + + @Override + public void onError() { + log.warn("Command Response Ack Error, No connect"); + } + } + + protected void processOnLoadResponse(CoapResponse response, CoapTestClient client) { + validateConfirmableFlag(client, response); + JsonNode responseJson = JacksonUtil.fromBytes(response.getPayload()); + int requestId = responseJson.get("id").asInt(); + client.setURI(CoapTestClient.getFeatureTokenUrl(accessToken, FeatureType.RPC, requestId)); + client.postMethod(new CoapHandler() { + @Override + public void onLoad(CoapResponse response) { + validateConfirmableFlag(client, response); + log.warn("RPC {} command response ack: {}", requestId, response.getCode()); + } + + @Override + public void onError() { + log.warn("RPC {} command response ack error, no connect", requestId); + } + }, DEVICE_RESPONSE, MediaTypeRegistry.APPLICATION_JSON); + } + + private CoapTestClient createClientForFeatureWithConfirmableParameter(FeatureType featureType, boolean confirmable) { + CoapTestClient coapTestClient = new CoapTestClient(accessToken, featureType); + if (confirmable) { + coapTestClient.useCONs(); + } else { + coapTestClient.useNONs(); + } + return coapTestClient; + } + + private List getEntityKeys(EntityKeyType scope) { + return CoapClientIntegrationTest.EXPECTED_KEYS.stream().map(key -> new EntityKey(scope, key)).collect(Collectors.toList()); + } +} diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java index 6dc5cccf38..ca46d989e4 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/AbstractSyncSessionCallback.java @@ -81,6 +81,7 @@ public abstract class AbstractSyncSessionCallback implements SessionMsgListener protected void respond(Response response) { response.getOptions().setContentFormat(TbCoapContentFormatUtil.getContentFormat(exchange.getRequestOptions().getContentFormat(), state.getContentFormat())); + response.setConfirmable(exchange.advanced().getRequest().isConfirmable()); exchange.respond(response); } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java index db843a0ea3..04ce043331 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/CoapOkCallback.java @@ -34,7 +34,9 @@ public class CoapOkCallback implements TransportServiceCallback { @Override public void onSuccess(Void msg) { - exchange.respond(new Response(onSuccessResponse)); + Response response = new Response(onSuccessResponse); + response.setConfirmable(isConRequest()); + exchange.respond(response); } @Override