diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 4d94e1051d..5af11d87f5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -317,25 +317,48 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private void handleGetAttributesRequest(TbActorCtx context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { int requestId = request.getRequestId(); - Futures.addCallback(getAttributesKvEntries(request), new FutureCallback>>() { - @Override - public void onSuccess(@Nullable List> result) { - GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() - .setRequestId(requestId) - .addAllClientAttributeList(toTsKvProtos(result.get(0))) - .addAllSharedAttributeList(toTsKvProtos(result.get(1))) - .build(); - sendToTransport(responseMsg, sessionInfo); - } + if (request.getOnlyShared()) { + Futures.addCallback(findAllAttributesByScope(DataConstants.SHARED_SCOPE), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() + .setRequestId(requestId) + .setSharedStateMsg(true) + .addAllSharedAttributeList(toTsKvProtos(result)) + .build(); + sendToTransport(responseMsg, sessionInfo); + } - @Override - public void onFailure(Throwable t) { - GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() - .setError(t.getMessage()) - .build(); - sendToTransport(responseMsg, sessionInfo); - } - }, MoreExecutors.directExecutor()); + @Override + public void onFailure(Throwable t) { + GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() + .setError(t.getMessage()) + .setSharedStateMsg(true) + .build(); + sendToTransport(responseMsg, sessionInfo); + } + }, MoreExecutors.directExecutor()); + } else { + Futures.addCallback(getAttributesKvEntries(request), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List> result) { + GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() + .setRequestId(requestId) + .addAllClientAttributeList(toTsKvProtos(result.get(0))) + .addAllSharedAttributeList(toTsKvProtos(result.get(1))) + .build(); + sendToTransport(responseMsg, sessionInfo); + } + + @Override + public void onFailure(Throwable t) { + GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder() + .setError(t.getMessage()) + .build(); + sendToTransport(responseMsg, sessionInfo); + } + }, MoreExecutors.directExecutor()); + } } private ListenableFuture>> getAttributesKvEntries(GetAttributeRequestMsg request) { @@ -403,9 +426,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } if (hasNotificationData) { AttributeUpdateNotificationMsg finalNotification = notification.build(); - attributeSubscriptions.entrySet().forEach(sub -> { - sendToTransport(finalNotification, sub.getKey(), sub.getValue().getNodeId()); - }); + attributeSubscriptions.forEach((key, value) -> sendToTransport(finalNotification, key, value.getNodeId())); } } else { log.debug("[{}] No registered attributes subscriptions to process!", deviceId); @@ -475,7 +496,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); if (sessionIdToRemove != null) { - notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); + notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove), "max concurrent sessions limit reached per device!"); } } sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId()))); @@ -521,7 +542,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { notifyTransportAboutProfileUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials()); }); } else { - sessions.forEach(this::notifyTransportAboutClosedSession); + sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!")); attributeSubscriptions.clear(); rpcSubscriptions.clear(); dumpSessions(); @@ -529,11 +550,15 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) { + private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message) { + SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto + .newBuilder() + .setMessage(message).build(); ToTransportMsg msg = ToTransportMsg.newBuilder() .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build(); + .setSessionCloseNotification(sessionCloseNotificationProto) + .build(); systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg); } @@ -741,7 +766,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessions.remove(sessionId); rpcSubscriptions.remove(sessionId); attributeSubscriptions.remove(sessionId); - notifyTransportAboutClosedSession(sessionId, sessionMD); + notifyTransportAboutClosedSession(sessionId, sessionMD, "session timeout!"); }); if (!sessionsToRemove.isEmpty()) { dumpSessions(); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java index faa6aae5c4..5286accd09 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesIntegrationTest.java @@ -44,6 +44,9 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr private static final String RESPONSE_ATTRIBUTES_PAYLOAD_DELETED = "{\"deleted\":[\"attribute5\"]}"; + protected static final String POST_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION = "{\"attribute1\":\"value\",\"attribute2\":false,\"attribute3\":41.0,\"attribute4\":72," + + "\"attribute5\":{\"someNumber\":41,\"someArray\":[],\"someNestedObject\":{\"key\":\"value\"}}}"; + @Before public void beforeTest() throws Exception { processBeforeTest("Test Subscribe to attribute updates", null, null); @@ -56,50 +59,85 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr @Test public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { - processTestSubscribeToAttributesUpdates(); + processTestSubscribeToAttributesUpdates(false); } - protected void processTestSubscribeToAttributesUpdates() throws Exception { + @Test + public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception { + processTestSubscribeToAttributesUpdates(true); + } + protected void processTestSubscribeToAttributesUpdates(boolean emptyCurrentStateNotification) throws Exception { + if (!emptyCurrentStateNotification) { + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION, String.class, status().isOk()); + } CoapClient client = getCoapClient(FeatureType.ATTRIBUTES); CountDownLatch latch = new CountDownLatch(1); - TestCoapCallback testCoapCallback = new TestCoapCallback(latch); + TestCoapCallback callback = new TestCoapCallback(latch); Request request = Request.newGet().setObserve(); request.setType(CoAP.Type.CON); - CoapObserveRelation observeRelation = client.observe(request, testCoapCallback); + CoapObserveRelation observeRelation = client.observe(request, callback); - Thread.sleep(1000); + latch.await(3, TimeUnit.SECONDS); + + if (emptyCurrentStateNotification) { + validateEmptyCurrentStateAttributesResponse(callback); + } else { + validateCurrentStateAttributesResponse(callback); + } + + latch = new CountDownLatch(1); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); latch.await(3, TimeUnit.SECONDS); - validateUpdateAttributesResponse(testCoapCallback); + validateUpdateAttributesResponse(callback); latch = new CountDownLatch(1); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=attribute5", String.class); latch.await(3, TimeUnit.SECONDS); - validateDeleteAttributesResponse(testCoapCallback); + validateDeleteAttributesResponse(callback); observeRelation.proactiveCancel(); assertTrue(observeRelation.isCanceled()); } - protected void validateUpdateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { + protected void validateCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { + assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(0, callback.getObserve().intValue()); + String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); + assertEquals(JacksonUtil.toJsonNode(POST_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION), JacksonUtil.toJsonNode(response)); + } + + protected void validateEmptyCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); assertEquals(0, callback.getObserve().intValue()); String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); + assertEquals("{}", response); + } + + protected void validateUpdateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { + assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(1, callback.getObserve().intValue()); + String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); assertEquals(JacksonUtil.toJsonNode(POST_ATTRIBUTES_PAYLOAD), JacksonUtil.toJsonNode(response)); } protected void validateDeleteAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); assertNotNull(callback.getObserve()); - assertEquals(1, callback.getObserve().intValue()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(2, callback.getObserve().intValue()); String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); assertEquals(JacksonUtil.toJsonNode(RESPONSE_ATTRIBUTES_PAYLOAD_DELETED), JacksonUtil.toJsonNode(response)); } @@ -110,13 +148,18 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr private Integer observe; private byte[] payloadBytes; + private CoAP.ResponseCode responseCode; + + public Integer getObserve() { + return observe; + } public byte[] getPayloadBytes() { return payloadBytes; } - public Integer getObserve() { - return observe; + public CoAP.ResponseCode getResponseCode() { + return responseCode; } private TestCoapCallback(CountDownLatch latch) { @@ -125,10 +168,9 @@ public abstract class AbstractCoapAttributesUpdatesIntegrationTest extends Abstr @Override public void onLoad(CoapResponse response) { - assertNotNull(response.getPayload()); - assertEquals(response.getCode(), CoAP.ResponseCode.CONTENT); observe = response.getOptions().getObserve(); payloadBytes = response.getPayload(); + responseCode = response.getCode(); latch.countDown(); } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesJsonIntegrationTest.java index adfef9fbef..24cc40f5c7 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesJsonIntegrationTest.java @@ -39,4 +39,9 @@ public abstract class AbstractCoapAttributesUpdatesJsonIntegrationTest extends A public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { super.testSubscribeToAttributesUpdatesFromTheServer(); } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception { + super.testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification(); + } } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesProtoIntegrationTest.java index 378354123e..625aaf1592 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/updates/AbstractCoapAttributesUpdatesProtoIntegrationTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.coap.attributes.updates; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -24,11 +25,15 @@ import org.thingsboard.server.common.data.CoapDeviceType; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.gen.transport.TransportProtos; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @Slf4j @@ -46,11 +51,54 @@ public abstract class AbstractCoapAttributesUpdatesProtoIntegrationTest extends @Test public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception { - processTestSubscribeToAttributesUpdates(); + processTestSubscribeToAttributesUpdates(false); + } + + @Test + public void testSubscribeToAttributesUpdatesFromTheServerWithEmptyCurrentStateNotification() throws Exception { + processTestSubscribeToAttributesUpdates(true); + } + + protected void validateCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { + assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(0, callback.getObserve().intValue()); + TransportProtos.AttributeUpdateNotificationMsg.Builder expectedCurrentStateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); + TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto("attribute1", "value", TransportProtos.KeyValueType.STRING_V); + TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto("attribute2", "false", TransportProtos.KeyValueType.BOOLEAN_V); + TransportProtos.TsKvProto tsKvProtoAttribute3 = getTsKvProto("attribute3", "41.0", TransportProtos.KeyValueType.DOUBLE_V); + TransportProtos.TsKvProto tsKvProtoAttribute4 = getTsKvProto("attribute4", "72", TransportProtos.KeyValueType.LONG_V); + TransportProtos.TsKvProto tsKvProtoAttribute5 = getTsKvProto("attribute5", "{\"someNumber\":41,\"someArray\":[],\"someNestedObject\":{\"key\":\"value\"}}", TransportProtos.KeyValueType.JSON_V); + List tsKvProtoList = new ArrayList<>(); + tsKvProtoList.add(tsKvProtoAttribute1); + tsKvProtoList.add(tsKvProtoAttribute2); + tsKvProtoList.add(tsKvProtoAttribute3); + tsKvProtoList.add(tsKvProtoAttribute4); + tsKvProtoList.add(tsKvProtoAttribute5); + TransportProtos.AttributeUpdateNotificationMsg expectedCurrentStateNotificationMsg = expectedCurrentStateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList).build(); + TransportProtos.AttributeUpdateNotificationMsg actualCurrentStateNotificationMsg = TransportProtos.AttributeUpdateNotificationMsg.parseFrom(callback.getPayloadBytes()); + + List expectedSharedUpdatedList = expectedCurrentStateNotificationMsg.getSharedUpdatedList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); + List actualSharedUpdatedList = actualCurrentStateNotificationMsg.getSharedUpdatedList().stream().map(TransportProtos.TsKvProto::getKv).collect(Collectors.toList()); + + assertEquals(expectedSharedUpdatedList.size(), actualSharedUpdatedList.size()); + assertTrue(actualSharedUpdatedList.containsAll(expectedSharedUpdatedList)); + + } + + protected void validateEmptyCurrentStateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { + assertNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(0, callback.getObserve().intValue()); } protected void validateUpdateAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(1, callback.getObserve().intValue()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); List tsKvProtoList = getTsKvProtoList(); attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); @@ -68,6 +116,9 @@ public abstract class AbstractCoapAttributesUpdatesProtoIntegrationTest extends protected void validateDeleteAttributesResponse(TestCoapCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(2, callback.getObserve().intValue()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); attributeUpdateNotificationMsgBuilder.addSharedDeleted("attribute5"); diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcDefaultIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcDefaultIntegrationTest.java index a4210aa31a..3c47b2497e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcDefaultIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcDefaultIntegrationTest.java @@ -83,7 +83,7 @@ public abstract class AbstractCoapServerSideRpcDefaultIntegrationTest extends Ab @Test public void testServerCoapTwoWayRpc() throws Exception { - processTwoWayRpcTest(); + processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}"); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java index d5d56db63b..a4dba5bb74 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.transport.coap.rpc; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.californium.core.CoapClient; @@ -24,17 +25,18 @@ import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Request; -import org.junit.Assert; -import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.CoapDeviceType; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.msg.session.FeatureType; +import org.thingsboard.server.transport.coap.AbstractCoapIntegrationTest; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -55,57 +57,66 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC client.useCONs(); CountDownLatch latch = new CountDownLatch(1); - TestCoapCallback testCoapCallback = new TestCoapCallback(client, latch, true); + TestCoapCallback callback = new TestCoapCallback(client, latch, true); Request request = Request.newGet().setObserve(); - CoapObserveRelation observeRelation = client.observe(request, testCoapCallback); + CoapObserveRelation observeRelation = client.observe(request, callback); + + latch.await(3, TimeUnit.SECONDS); + + validateCurrentStateNotification(callback); + + latch = new CountDownLatch(1); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); String result = doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); - Assert.assertTrue(StringUtils.isEmpty(result)); + latch.await(3, TimeUnit.SECONDS); - assertEquals(0, testCoapCallback.getObserve().intValue()); + + validateOneWayStateChangedNotification(callback, result); + observeRelation.proactiveCancel(); assertTrue(observeRelation.isCanceled()); } - protected void processTwoWayRpcTest() throws Exception { + protected void processTwoWayRpcTest(String expectedResponseResult) throws Exception { CoapClient client = getCoapClient(FeatureType.RPC); client.useCONs(); CountDownLatch latch = new CountDownLatch(1); - TestCoapCallback testCoapCallback = new TestCoapCallback(client, latch, false); + TestCoapCallback callback = new TestCoapCallback(client, latch, false); Request request = Request.newGet().setObserve(); request.setType(CoAP.Type.CON); - CoapObserveRelation observeRelation = client.observe(request, testCoapCallback); + CoapObserveRelation observeRelation = client.observe(request, callback); + + latch.await(3, TimeUnit.SECONDS); + + validateCurrentStateNotification(callback); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); - String expected = "{\"value1\":\"A\",\"value2\":\"B\"}"; + String actualResult = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + latch.await(3, TimeUnit.SECONDS); + + validateTwoWayStateChangedNotification(callback, 1, expectedResponseResult, actualResult); - String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); + latch = new CountDownLatch(1); + + actualResult = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); latch.await(3, TimeUnit.SECONDS); - assertEquals(expected, result); - assertEquals(0, testCoapCallback.getObserve().intValue()); + validateTwoWayStateChangedNotification(callback, 2, expectedResponseResult, actualResult); + observeRelation.proactiveCancel(); assertTrue(observeRelation.isCanceled()); - -// // TODO: 3/11/21 Fix test to validate next RPC -// latch = new CountDownLatch(1); -// -// result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); -// latch.await(3, TimeUnit.SECONDS); -// -// assertEquals(expected, result); -// assertEquals(1, testCoapCallback.getObserve().intValue()); } protected void processOnLoadResponse(CoapResponse response, CoapClient client, Integer observe, CountDownLatch latch) { - client.setURI(getRpcResponseFeatureTokenUrl(accessToken, observe)); + JsonNode responseJson = JacksonUtil.fromBytes(response.getPayload()); + client.setURI(getRpcResponseFeatureTokenUrl(accessToken, responseJson.get("id").asInt())); client.post(new CoapHandler() { @Override public void onLoad(CoapResponse response) { @@ -130,11 +141,21 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC private final CountDownLatch latch; private final boolean isOneWayRpc; + private Integer observe; + private byte[] payloadBytes; + private CoAP.ResponseCode responseCode; + public Integer getObserve() { return observe; } - private Integer observe; + public byte[] getPayloadBytes() { + return payloadBytes; + } + + public CoAP.ResponseCode getResponseCode() { + return responseCode; + } TestCoapCallback(CoapClient client, CountDownLatch latch, boolean isOneWayRpc) { this.client = client; @@ -144,14 +165,15 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC @Override public void onLoad(CoapResponse response) { - log.warn("coap response: {}, {}", response.getResponseText(), response.getCode()); - assertNotNull(response.getPayload()); - assertEquals(response.getCode(), CoAP.ResponseCode.CONTENT); + payloadBytes = response.getPayload(); + responseCode = response.getCode(); observe = response.getOptions().getObserve(); - if (!isOneWayRpc) { - processOnLoadResponse(response, client, observe, latch); - } else { - latch.countDown(); + if (observe != null) { + if (!isOneWayRpc && observe > 0) { + processOnLoadResponse(response, client, observe, latch); + } else { + latch.countDown(); + } } } @@ -162,4 +184,28 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC } + private void validateCurrentStateNotification(TestCoapCallback callback) { + assertNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode.VALID); + assertEquals(0, callback.getObserve().intValue()); + } + + private void validateOneWayStateChangedNotification(TestCoapCallback callback, String result) { + assertTrue(StringUtils.isEmpty(result)); + assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(1, callback.getObserve().intValue()); + } + + private void validateTwoWayStateChangedNotification(TestCoapCallback callback, int expectedObserveNumber, String expectedResult, String actualResult) { + assertEquals(expectedResult, actualResult); + assertNotNull(callback.getPayloadBytes()); + assertNotNull(callback.getObserve()); + assertEquals(callback.getResponseCode(), CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + assertEquals(expectedObserveNumber, callback.getObserve().intValue()); + } + + } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcJsonIntegrationTest.java index cecdb1fae0..3aa1dec074 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcJsonIntegrationTest.java @@ -42,7 +42,7 @@ public abstract class AbstractCoapServerSideRpcJsonIntegrationTest extends Abstr @Test public void testServerCoapTwoWayRpc() throws Exception { - processTwoWayRpcTest(); + processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}"); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcProtoIntegrationTest.java index 706a560024..544090d7c8 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcProtoIntegrationTest.java @@ -85,37 +85,11 @@ public abstract class AbstractCoapServerSideRpcProtoIntegrationTest extends Abst @Test public void testServerCoapTwoWayRpc() throws Exception { - processTwoWayRpcTest(); - } - - protected void processTwoWayRpcTest() throws Exception { - CoapClient client = getCoapClient(FeatureType.RPC); - client.useCONs(); - - CountDownLatch latch = new CountDownLatch(1); - TestCoapCallback testCoapCallback = new TestCoapCallback(client, latch, false); - - Request request = Request.newGet().setObserve(); - request.setType(CoAP.Type.CON); - CoapObserveRelation observeRelation = client.observe(request, testCoapCallback); - - String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; - String deviceId = savedDevice.getId().getId().toString(); - - String expected = "{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}"; - - String result = doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - latch.await(3, TimeUnit.SECONDS); - - assertEquals(expected, result); - assertEquals(0, testCoapCallback.getObserve().intValue()); - observeRelation.proactiveCancel(); - assertTrue(observeRelation.isCanceled()); + processTwoWayRpcTest("{\"payload\":\"{\\\"value1\\\":\\\"A\\\",\\\"value2\\\":\\\"B\\\"}\"}"); } @Override protected void processOnLoadResponse(CoapResponse response, CoapClient client, Integer observe, CountDownLatch latch) { - client.setURI(getRpcResponseFeatureTokenUrl(accessToken, observe)); ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = getProtoTransportPayloadConfiguration(); ProtoFileElement rpcRequestProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(RPC_REQUEST_PROTO_SCHEMA); DynamicSchema rpcRequestProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcRequestProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_REQUEST_PROTO_SCHEMA); @@ -123,25 +97,22 @@ public abstract class AbstractCoapServerSideRpcProtoIntegrationTest extends Abst byte[] requestPayload = response.getPayload(); DynamicMessage.Builder rpcRequestMsg = rpcRequestProtoSchema.newMessageBuilder("RpcRequestMsg"); Descriptors.Descriptor rpcRequestMsgDescriptor = rpcRequestMsg.getDescriptorForType(); - assertNotNull(rpcRequestMsgDescriptor); try { DynamicMessage dynamicMessage = DynamicMessage.parseFrom(rpcRequestMsgDescriptor, requestPayload); - List fields = rpcRequestMsgDescriptor.getFields(); - for (Descriptors.FieldDescriptor fieldDescriptor: fields) { - assertTrue(dynamicMessage.hasField(fieldDescriptor)); - } + Descriptors.FieldDescriptor requestIdDescriptor = rpcRequestMsgDescriptor.findFieldByName("requestId"); + int requestId = (int) dynamicMessage.getField(requestIdDescriptor); ProtoFileElement rpcResponseProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_RPC_RESPONSE_PROTO_SCHEMA); DynamicSchema rpcResponseProtoSchema = protoTransportPayloadConfiguration.getDynamicSchema(rpcResponseProtoSchemaFile, ProtoTransportPayloadConfiguration.RPC_RESPONSE_PROTO_SCHEMA); DynamicMessage.Builder rpcResponseBuilder = rpcResponseProtoSchema.newMessageBuilder("RpcResponseMsg"); Descriptors.Descriptor rpcResponseMsgDescriptor = rpcResponseBuilder.getDescriptorForType(); - assertNotNull(rpcResponseMsgDescriptor); DynamicMessage rpcResponseMsg = rpcResponseBuilder .setField(rpcResponseMsgDescriptor.findFieldByName("payload"), DEVICE_RESPONSE) .build(); + client.setURI(getRpcResponseFeatureTokenUrl(accessToken, requestId)); client.post(new CoapHandler() { @Override public void onLoad(CoapResponse response) { - log.warn("Command Response Ack: {}, {}", response.getCode(), response.getResponseText()); + log.warn("Command Response Ack: {}", response.getCode()); latch.countDown(); } diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 74363ed814..b4d929f60c 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -137,6 +137,7 @@ message GetAttributeRequestMsg { int32 requestId = 1; repeated string clientAttributeNames = 2; repeated string sharedAttributeNames = 3; + bool onlyShared = 4; } message GetAttributeResponseMsg { @@ -144,6 +145,7 @@ message GetAttributeResponseMsg { repeated TsKvProto clientAttributeList = 2; repeated TsKvProto sharedAttributeList = 3; string error = 5; + bool sharedStateMsg = 6; } message AttributeUpdateNotificationMsg { diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java index 85a4f5ba70..7ab1306953 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/AbstractCoapTransportResource.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.server.resources.CoapExchange; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.transport.TransportContext; @@ -56,7 +57,7 @@ public abstract class AbstractCoapTransportResource extends CoapResource { protected abstract void processHandlePost(CoapExchange exchange); - protected void reportActivity(TransportProtos.SessionInfoProto sessionInfo, boolean hasAttributeSubscription, boolean hasRpcSubscription) { + protected void reportSubscriptionInfo(TransportProtos.SessionInfoProto sessionInfo, boolean hasAttributeSubscription, boolean hasRpcSubscription) { transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() .setAttributeSubscription(hasAttributeSubscription) .setRpcSubscription(hasRpcSubscription) @@ -64,6 +65,10 @@ public abstract class AbstractCoapTransportResource extends CoapResource { .build(), TransportServiceCallback.EMPTY); } + protected void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { + transportService.reportActivity(sessionInfo); + } + protected static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { return TransportProtos.SessionEventMsg.newBuilder() .setSessionType(TransportProtos.SessionType.ASYNC) @@ -112,13 +117,19 @@ public abstract class AbstractCoapTransportResource extends CoapResource { @Override public void onSuccess(Void msg) { - exchange.respond(onSuccessResponse); + Response response = new Response(onSuccessResponse); + response.setAcknowledged(isConRequest()); + exchange.respond(response); } @Override public void onError(Throwable e) { exchange.respond(onFailureResponse); } + + private boolean isConRequest() { + return exchange.advanced().getRequest().isConfirmable(); + } } public static class CoapNoOpCallback implements TransportServiceCallback { diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportContext.java index 9133809225..6dc0b6540a 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportContext.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.coap; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportContext; @@ -34,6 +35,10 @@ import org.thingsboard.server.transport.coap.efento.adaptor.EfentoCoapAdaptor; @Component public class CoapTransportContext extends TransportContext { + @Getter + @Value("${transport.sessions.report_timeout}") + private long sessionReportTimeout; + @Getter @Autowired private JsonCoapAdaptor jsonCoapAdaptor; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index e287a28aa0..47cf44f6fa 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -28,6 +28,7 @@ import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.core.server.resources.Resource; import org.eclipse.californium.core.server.resources.ResourceObserver; +import org.springframework.util.CollectionUtils; import org.thingsboard.server.coapserver.CoapServerService; import org.thingsboard.server.coapserver.TbCoapDtlsSessionInfo; import org.thingsboard.server.common.data.DataConstants; @@ -55,11 +56,14 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @@ -72,25 +76,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4; private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID"; - private final ConcurrentMap tokenToSessionIdMap = new ConcurrentHashMap<>(); - private final ConcurrentMap tokenToNotificationCounterMap = new ConcurrentHashMap<>(); + private final ConcurrentMap tokenToSessionInfoMap = new ConcurrentHashMap<>(); + private final ConcurrentMap tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>(); + private final ConcurrentMap sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); private final Set rpcSubscriptions = ConcurrentHashMap.newKeySet(); private final Set attributeSubscriptions = ConcurrentHashMap.newKeySet(); private ConcurrentMap dtlsSessionIdMap; private long timeout; + private long sessionReportTimeout; - public CoapTransportResource(CoapTransportContext coapTransportContext, CoapServerService coapServerService, String name) { - super(coapTransportContext, name); + public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { + super(ctx, name); this.setObservable(true); // enable observing this.addObserver(new CoapResourceObserver()); this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap(); this.timeout = coapServerService.getTimeout(); -// this.setObservable(false); // disable observing -// this.setObserveType(CoAP.Type.CON); // configure the notification type to CONs -// this.getAttributes().setObservable(); // mark observable in the Link-Format + this.sessionReportTimeout = ctx.getSessionReportTimeout(); + ctx.getScheduler().scheduleAtFixedRate(() -> { + Set observeSessions = sessionInfoToObserveRelationMap.keySet(); + observeSessions.forEach(this::reportActivity); + }, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); } + @Override public void checkObserveRelation(Exchange exchange, Response response) { String token = getTokenFromRequest(exchange.getRequest()); final ObserveRelation relation = exchange.getRelation(); @@ -103,11 +112,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { relation.setEstablished(); addObserveRelation(relation); } - AtomicInteger notificationCounter = tokenToNotificationCounterMap.computeIfAbsent(token, s -> new AtomicInteger(0)); + AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0)); response.getOptions().setObserve(notificationCounter.getAndIncrement()); } // ObserveLayer takes care of the else case } + public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { + relation.cancel(); + relation.getExchange().sendResponse(new Response(code)); + } + + public Map getSessionInfoToObserveRelationMap() { + return sessionInfoToObserveRelationMap; + } + @Override protected void processHandleGet(CoapExchange exchange) { Optional featureType = getFeatureType(exchange.advanced().getRequest()); @@ -239,7 +257,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } private void processRequest(CoapExchange exchange, SessionMsgType type, Request request, TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) { - UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + UUID sessionId = toSessionId(sessionInfo); try { TransportConfigurationContainer transportConfigurationContainer = getTransportConfigurationContainer(deviceProfile); CoapTransportAdaptor coapTransportAdaptor = getCoapTransportAdaptor(transportConfigurationContainer.isJsonPayload()); @@ -249,14 +267,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { coapTransportAdaptor.convertToPostAttributes(sessionId, request, transportConfigurationContainer.getAttributesMsgDescriptor()), new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - reportActivity(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); + reportSubscriptionInfo(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); break; case POST_TELEMETRY_REQUEST: transportService.process(sessionInfo, coapTransportAdaptor.convertToPostTelemetry(sessionId, request, transportConfigurationContainer.getTelemetryMsgDescriptor()), new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - reportActivity(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); + reportSubscriptionInfo(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); break; case CLAIM_REQUEST: transportService.process(sessionInfo, @@ -264,49 +282,52 @@ public class CoapTransportResource extends AbstractCoapTransportResource { new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); break; case SUBSCRIBE_ATTRIBUTES_REQUEST: - TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionIdMap.get(getTokenFromRequest(request)); + TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); if (currentAttrSession == null) { attributeSubscriptions.add(sessionId); registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange)); + transportService.process(sessionInfo, + TransportProtos.GetAttributeRequestMsg.newBuilder().setOnlyShared(true).build(), + new CoapNoOpCallback(exchange)); } break; case UNSUBSCRIBE_ATTRIBUTES_REQUEST: TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); if (attrSession != null) { - UUID attrSessionId = new UUID(attrSession.getSessionIdMSB(), attrSession.getSessionIdLSB()); + UUID attrSessionId = toSessionId(attrSession); attributeSubscriptions.remove(attrSessionId); + sessionInfoToObserveRelationMap.remove(attrSession); transportService.process(attrSession, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - closeAndDeregister(sessionInfo, sessionId); + closeAndDeregister(sessionInfo); } break; case SUBSCRIBE_RPC_COMMANDS_REQUEST: - TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionIdMap.get(getTokenFromRequest(request)); + TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); if (currentRpcSession == null) { rpcSubscriptions.add(sessionId); registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), - new CoapNoOpCallback(exchange)); - } else { - UUID rpcSessionId = new UUID(currentRpcSession.getSessionIdMSB(), currentRpcSession.getSessionIdLSB()); - reportActivity(currentRpcSession, attributeSubscriptions.contains(rpcSessionId), rpcSubscriptions.contains(rpcSessionId)); + new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) + ); } break; case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); if (rpcSession != null) { - UUID rpcSessionId = new UUID(rpcSession.getSessionIdMSB(), rpcSession.getSessionIdLSB()); + UUID rpcSessionId = toSessionId(rpcSession); rpcSubscriptions.remove(rpcSessionId); + sessionInfoToObserveRelationMap.remove(rpcSession); transportService.process(rpcSession, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - closeAndDeregister(sessionInfo, sessionId); + closeAndDeregister(sessionInfo); } break; case TO_DEVICE_RPC_RESPONSE: @@ -341,6 +362,10 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } } + private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) { + return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); + } + private void getFirmwareCallback(TransportProtos.SessionInfoProto sessionInfo, CoapExchange exchange, FirmwareType firmwareType) { TransportProtos.GetFirmwareRequestMsg requestMsg = TransportProtos.GetFirmwareRequestMsg.newBuilder() .setTenantIdMSB(sessionInfo.getTenantIdMSB()) @@ -352,18 +377,18 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) { - tokenToNotificationCounterMap.remove(token); - return tokenToSessionIdMap.remove(token); + tokenToObserveNotificationSeqMap.remove(token); + return tokenToSessionInfoMap.remove(token); } private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { - tokenToSessionIdMap.putIfAbsent(token, sessionInfo); + tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); } private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { - return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); + return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); } private String getTokenFromRequest(Request request) { @@ -481,11 +506,13 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private static class CoapSessionListener implements SessionMsgListener { + private final CoapTransportResource coapTransportResource; private final CoapExchange exchange; private final CoapTransportAdaptor coapTransportAdaptor; private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; - CoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { + CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { + this.coapTransportResource = coapTransportResource; this.exchange = exchange; this.coapTransportAdaptor = coapTransportAdaptor; this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; @@ -494,7 +521,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @Override public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) { try { - exchange.respond(coapTransportAdaptor.convertToPublish(msg)); + exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); @@ -512,8 +539,21 @@ public class CoapTransportResource extends AbstractCoapTransportResource { } @Override - public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { - exchange.respond(CoAP.ResponseCode.SERVICE_UNAVAILABLE); + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); + Map sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap(); + if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { + Set observeSessions = sessionToObserveRelationMap.keySet(); + Optional observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> { + UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); + return observeSessionId.equals(sessionId); + }).findFirst(); + if (observeSessionToClose.isPresent()) { + TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get(); + ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto); + coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE); + } + } } @Override @@ -529,7 +569,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @Override public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg msg) { try { - exchange.respond(coapTransportAdaptor.convertToPublish(msg)); + exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); @@ -561,29 +601,29 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @Override public void addedObserveRelation(ObserveRelation relation) { - if (log.isTraceEnabled()) { - Request request = relation.getExchange().getRequest(); - log.trace("Added Observe relation for token: {}", getTokenFromRequest(request)); - } + Request request = relation.getExchange().getRequest(); + String token = getTokenFromRequest(request); + sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation); + log.trace("Added Observe relation for token: {}", token); } @Override public void removedObserveRelation(ObserveRelation relation) { Request request = relation.getExchange().getRequest(); - String tokenFromRequest = getTokenFromRequest(request); - log.trace("Relation removed for token: {}", tokenFromRequest); - TransportProtos.SessionInfoProto sessionInfoToRemove = lookupAsyncSessionInfo(tokenFromRequest); - if (sessionInfoToRemove != null) { - closeAndDeregister(sessionInfoToRemove, new UUID(sessionInfoToRemove.getSessionIdMSB(), sessionInfoToRemove.getDeviceIdLSB())); - } + String token = getTokenFromRequest(request); + TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token); + sessionInfoToObserveRelationMap.remove(session); + log.trace("Relation removed for token: {}", token); } } - private void closeAndDeregister(TransportProtos.SessionInfoProto session, UUID sessionId) { + private void closeAndDeregister(TransportProtos.SessionInfoProto session) { + UUID sessionId = toSessionId(session); transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); transportService.deregisterSession(session); rpcSubscriptions.remove(sessionId); attributeSubscriptions.remove(sessionId); + sessionInfoToObserveRelationMap.remove(session); } private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException { diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java index a739563858..ce1618fe99 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.CoapServer; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.TbTransportService; import org.thingsboard.server.coapserver.CoapServerService; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java index f7572d0506..f94547b840 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java @@ -40,6 +40,7 @@ public class CoapAdaptorUtils { result.addAllSharedAttributeNames(sharedKeys); } } + result.setOnlyShared(false); return result.build(); } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java index b30c86307b..00374f6916 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java @@ -39,13 +39,13 @@ public interface CoapTransportAdaptor { TransportProtos.ClaimDeviceMsg convertToClaimDevice(UUID sessionId, Request inbound, TransportProtos.SessionInfoProto sessionInfo) throws AdaptorException; - Response convertToPublish(TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException; + Response convertToPublish(boolean isConfirmable, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException; Response convertToPublish(boolean isConfirmable, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; Response convertToPublish(boolean isConfirmable, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) throws AdaptorException; - Response convertToPublish(TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException; + Response convertToPublish(boolean isConfirmable, TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException; ProvisionDeviceRequestMsg convertToProvisionRequestMsg(UUID sessionId, Request inbound) throws AdaptorException; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java index 85d35301c3..4b4369f222 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java @@ -26,12 +26,14 @@ import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.CoapTransportResource; +import java.util.List; import java.util.Optional; import java.util.UUID; @@ -101,10 +103,11 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { } @Override - public Response convertToPublish(TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException { + public Response convertToPublish(boolean isConfirmable, TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException { Response response = new Response(CoAP.ResponseCode.CONTENT); JsonElement result = JsonConverter.toJson(msg); response.setPayload(result.toString()); + response.setAcknowledged(isConfirmable); return response; } @@ -119,21 +122,35 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { } @Override - public Response convertToPublish(TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException { - if (msg.getClientAttributeListCount() == 0 && msg.getSharedAttributeListCount() == 0) { - return new Response(CoAP.ResponseCode.NOT_FOUND); + public Response convertToPublish(boolean isConfirmable, TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException { + if (msg.getSharedStateMsg()) { + if (StringUtils.isEmpty(msg.getError())) { + Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + response.setAcknowledged(isConfirmable); + TransportProtos.AttributeUpdateNotificationMsg notificationMsg = TransportProtos.AttributeUpdateNotificationMsg.newBuilder().addAllSharedUpdated(msg.getSharedAttributeListList()).build(); + JsonObject result = JsonConverter.toJson(notificationMsg); + response.setPayload(result.toString()); + return response; + } else { + return new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + } } else { - Response response = new Response(CoAP.ResponseCode.CONTENT); - JsonObject result = JsonConverter.toJson(msg); - response.setPayload(result.toString()); - return response; + if (msg.getClientAttributeListCount() == 0 && msg.getSharedAttributeListCount() == 0) { + return new Response(CoAP.ResponseCode.NOT_FOUND); + } else { + Response response = new Response(CoAP.ResponseCode.CONTENT); + response.setAcknowledged(isConfirmable); + JsonObject result = JsonConverter.toJson(msg); + response.setPayload(result.toString()); + return response; + } } } private Response getObserveNotification(boolean confirmable, JsonElement json) { - Response response = new Response(CoAP.ResponseCode.CONTENT); + Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); response.setPayload(json.toString()); - response.setConfirmable(confirmable); + response.setAcknowledged(confirmable); return response; } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java index 120e49f697..2ab377611c 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.coap.adaptors; import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -26,6 +27,7 @@ import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; @@ -118,27 +120,41 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor { } @Override - public Response convertToPublish(TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException { + public Response convertToPublish(boolean isConfirmable, TransportProtos.ToServerRpcResponseMsg msg) throws AdaptorException { Response response = new Response(CoAP.ResponseCode.CONTENT); + response.setAcknowledged(isConfirmable); response.setPayload(msg.toByteArray()); return response; } @Override - public Response convertToPublish(TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException { - if (msg.getClientAttributeListCount() == 0 && msg.getSharedAttributeListCount() == 0) { - return new Response(CoAP.ResponseCode.NOT_FOUND); + public Response convertToPublish(boolean isConfirmable, TransportProtos.GetAttributeResponseMsg msg) throws AdaptorException { + if (msg.getSharedStateMsg()) { + if (StringUtils.isEmpty(msg.getError())) { + Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); + response.setAcknowledged(isConfirmable); + TransportProtos.AttributeUpdateNotificationMsg notificationMsg = TransportProtos.AttributeUpdateNotificationMsg.newBuilder().addAllSharedUpdated(msg.getSharedAttributeListList()).build(); + response.setPayload(notificationMsg.toByteArray()); + return response; + } else { + return new Response(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + } } else { - Response response = new Response(CoAP.ResponseCode.CONTENT); - response.setPayload(msg.toByteArray()); - return response; + if (msg.getClientAttributeListCount() == 0 && msg.getSharedAttributeListCount() == 0) { + return new Response(CoAP.ResponseCode.NOT_FOUND); + } else { + Response response = new Response(CoAP.ResponseCode.CONTENT); + response.setAcknowledged(isConfirmable); + response.setPayload(msg.toByteArray()); + return response; + } } } private Response getObserveNotification(boolean confirmable, byte[] notification) { - Response response = new Response(CoAP.ResponseCode.CONTENT); + Response response = new Response(CoAP.ResponseCode._UNKNOWN_SUCCESS_CODE); response.setPayload(notification); - response.setConfirmable(confirmable); + response.setAcknowledged(confirmable); return response; } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/NoSecObserveClient.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/NoSecObserveClient.java new file mode 100644 index 0000000000..3340fdd61b --- /dev/null +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/NoSecObserveClient.java @@ -0,0 +1,103 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.coap.client; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapHandler; +import org.eclipse.californium.core.CoapObserveRelation; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.Request; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class NoSecObserveClient { + + private static final long INFINIT_EXCHANGE_LIFETIME = 0L; + + private CoapClient coapClient; + private CoapObserveRelation observeRelation; + private ExecutorService executor = Executors.newFixedThreadPool(1); + private CountDownLatch latch; + + public NoSecObserveClient(String host, int port, String accessToken) throws URISyntaxException { + URI uri = new URI(getFutureUrl(host, port, accessToken)); + this.coapClient = new CoapClient(uri); + coapClient.setTimeout(INFINIT_EXCHANGE_LIFETIME); + this.latch = new CountDownLatch(5); + } + + public void start() { + executor.submit(() -> { + try { + Request request = Request.newGet(); + request.setObserve(); + observeRelation = coapClient.observe(request, new CoapHandler() { + @Override + public void onLoad(CoapResponse response) { + String responseText = response.getResponseText(); + CoAP.ResponseCode code = response.getCode(); + Integer observe = response.getOptions().getObserve(); + log.info("CoAP Response received! " + + "responseText: {}, " + + "code: {}, " + + "observe seq number: {}", + responseText, + code, + observe); + latch.countDown(); + } + + @Override + public void onError() { + log.error("Ack error!"); + latch.countDown(); + } + }); + } catch (Exception e) { + log.error("Error occurred while sending COAP requests: "); + } + }); + try { + latch.await(); + observeRelation.proactiveCancel(); + } catch (InterruptedException e) { + log.error("Error occurred: ", e); + } + } + + private String getFutureUrl(String host, Integer port, String accessToken) { + return "coap://" + host + ":" + port + "/api/v1/" + accessToken + "/attributes"; + } + + public static void main(String[] args) throws URISyntaxException { + log.info("Usage: java -cp ... org.thingsboard.server.transport.coap.client.NoSecObserveClient " + + "host port accessToken"); + + String host = args[0]; + int port = Integer.parseInt(args[1]); + String accessToken = args[2]; + + final NoSecObserveClient client = new NoSecObserveClient(host, port, accessToken); + client.start(); + } +} diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java index 03ed92da58..d53cd7e49e 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java @@ -87,7 +87,7 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource { transportService.process(sessionInfo, transportContext.getEfentoCoapAdaptor().convertToPostTelemetry(sessionId, efentoMeasurements), new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); - reportActivity(sessionInfo, false, false); + reportSubscriptionInfo(sessionInfo, false, false); } catch (AdaptorException e) { log.error("[{}] Failed to decode Efento ProtoMeasurements: ", sessionId, e); exchange.respond(CoAP.ResponseCode.BAD_REQUEST); diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index bb9499ae29..31180b478d 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -391,7 +391,8 @@ public class DeviceApiController implements TbTransportService { } @Override - public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) { + public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) { + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java index 5d6486d9a2..b71de7db1b 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java @@ -32,6 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseM import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import java.util.Optional; +import java.util.UUID; @Slf4j public class LwM2mSessionMsgListener implements GenericFutureListener>, SessionMsgListener { @@ -54,8 +55,8 @@ public class LwM2mSessionMsgListener implements GenericFutureListener sessionMD.getLastReportedActivityTime()) { final long lastActivityTimeFinal = lastActivityTime; @@ -683,10 +687,13 @@ public class DefaultTransportService implements TransportService { @Override public SessionMetaData registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener); - sessions.putIfAbsent(toSessionId(sessionInfo), currentSession); + UUID sessionId = toSessionId(sessionInfo); + sessions.putIfAbsent(sessionId, currentSession); + + TransportProtos.SessionCloseNotificationProto notification = TransportProtos.SessionCloseNotificationProto.newBuilder().setMessage("session timeout!").build(); ScheduledFuture executorFuture = scheduler.schedule(() -> { - listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); + listener.onRemoteSessionCloseCommand(sessionId, notification); deregisterSession(sessionInfo); }, timeout, TimeUnit.MILLISECONDS); @@ -739,7 +746,7 @@ public class DefaultTransportService implements TransportService { listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); } if (toSessionMsg.hasSessionCloseNotification()) { - listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification()); + listener.onRemoteSessionCloseCommand(sessionId, toSessionMsg.getSessionCloseNotification()); } if (toSessionMsg.hasToTransportUpdateCredentialsNotification()) { listener.onToTransportUpdateCredentials(toSessionMsg.getToTransportUpdateCredentialsNotification());