From bb90b2f1375514b32a802037a4d1e90017acb950 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 6 Apr 2021 07:38:52 +0300 Subject: [PATCH] lwm2m queue mode --- .../secure/LwM2MBootstrapSecurityStore.java | 7 +++--- .../lwm2m/server/LwM2mQueuedRequest.java | 20 ++++++++++++++++ .../lwm2m/server/LwM2mTransportRequest.java | 23 ++++++++++++------- .../server/LwM2mTransportServiceImpl.java | 19 +++++++++++---- .../lwm2m/server/client/LwM2mClient.java | 5 ++++ 5 files changed, 58 insertions(+), 16 deletions(-) create mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mQueuedRequest.java diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java index ee2b3bac20..52638a99e2 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java @@ -40,7 +40,7 @@ import org.thingsboard.server.transport.lwm2m.utils.TypeServer; import java.io.IOException; import java.security.GeneralSecurityException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -70,8 +70,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore { @Override public List getAllByEndpoint(String endPoint) { - String endPointKey = endPoint; - ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(endPointKey, TypeServer.BOOTSTRAP); + ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(endPoint, TypeServer.BOOTSTRAP); if (store.getBootstrapJsonCredential() != null && store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) { /** add value to store from BootstrapJson */ this.setBootstrapConfigScurityInfo(store); @@ -87,7 +86,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore { } catch (InvalidConfigurationException e) { log.error("", e); } - return store.getSecurityInfo() == null ? null : Arrays.asList(store.getSecurityInfo()); + return store.getSecurityInfo() == null ? null : Collections.singletonList(store.getSecurityInfo()); } } return null; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mQueuedRequest.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mQueuedRequest.java new file mode 100644 index 0000000000..dfc88a3331 --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mQueuedRequest.java @@ -0,0 +1,20 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server; + +public interface LwM2mQueuedRequest { + void send(); +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportRequest.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportRequest.java index ad1e542ce1..e237146c8a 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportRequest.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportRequest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.lwm2m.server; import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Response; import org.eclipse.leshan.core.attributes.Attribute; import org.eclipse.leshan.core.attributes.AttributeSet; @@ -34,6 +35,7 @@ import org.eclipse.leshan.core.request.ObserveRequest; import org.eclipse.leshan.core.request.ReadRequest; import org.eclipse.leshan.core.request.WriteAttributesRequest; import org.eclipse.leshan.core.request.WriteRequest; +import org.eclipse.leshan.core.request.exception.ClientSleepingException; import org.eclipse.leshan.core.response.CancelObservationResponse; import org.eclipse.leshan.core.response.DeleteResponse; import org.eclipse.leshan.core.response.DiscoverResponse; @@ -58,7 +60,6 @@ import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.eclipse.californium.core.coap.CoAP.ResponseCode.isSuccess; import static org.eclipse.leshan.core.attributes.Attribute.MINIMUM_PERIOD; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportHandler.DEFAULT_TIMEOUT; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportHandler.GET_TYPE_OPER_DISCOVER; @@ -122,7 +123,7 @@ public class LwM2mTransportRequest { DownlinkRequest request = null; ContentFormat contentFormat = contentFormatParam != null ? ContentFormat.fromName(contentFormatParam.toUpperCase()) : null; LwM2mClient lwM2MClient = lwM2mClientContext.getLwM2mClientWithReg(registration, null); - ResourceModel resource = lwM2MClient.getResourceModel(target); + ResourceModel resource = lwM2MClient.getResourceModel(targetIdVer); timeoutInMs = timeoutInMs > 0 ? timeoutInMs : DEFAULT_TIMEOUT; switch (typeOper) { case GET_TYPE_OPER_READ: @@ -215,13 +216,19 @@ public class LwM2mTransportRequest { request = new WriteAttributesRequest(resultIds.getObjectId(), attrSet); } break; - default: } if (request != null) { - this.sendRequest(registration, lwM2MClient, request, timeoutInMs); - } - else { + try { + this.sendRequest(registration, lwM2MClient, request, timeoutInMs); + } catch (ClientSleepingException e) { + DownlinkRequest finalRequest = request; + long finalTimeoutInMs = timeoutInMs; + lwM2MClient.getQueuedRequests().add(() -> sendRequest(registration, lwM2MClient, finalRequest, finalTimeoutInMs)); + } catch (Exception e) { + log.error("[{}] [{}] [{}] Failed to send downlink.", registration.getEndpoint(), targetIdVer, typeOper, e); + } + } else { log.error("[{}], [{}] - [{}] error SendRequest", registration.getEndpoint(), typeOper, targetIdVer); } } @@ -240,7 +247,7 @@ public class LwM2mTransportRequest { if (!lwM2MClient.isInit()) { lwM2MClient.initValue(this.serviceImpl, convertToIdVerFromObjectId(request.getPath().toString(), registration)); } - if (isSuccess(((Response) response.getCoapResponse()).getCode())) { + if (CoAP.ResponseCode.isSuccess(((Response) response.getCoapResponse()).getCode())) { this.handleResponse(registration, request.getPath().toString(), response, request); if (request instanceof WriteRequest && ((WriteRequest) request).isReplaceRequest()) { String msg = String.format("%s: sendRequest Replace: CoapCde - %s Lwm2m code - %d name - %s Resource path - %s value - %s SendRequest to Client", @@ -283,7 +290,7 @@ public class LwM2mTransportRequest { case FLOAT: // Double return (contentFormat == null) ? new WriteRequest(objectId, instanceId, resourceId, Double.parseDouble(value.toString())) : new WriteRequest(contentFormat, objectId, instanceId, resourceId, Double.parseDouble(value.toString())); case TIME: // Date - Date date = new Date(Long.decode(value.toString())); + Date date = new Date(Long.decode(value.toString())); return (contentFormat == null) ? new WriteRequest(objectId, instanceId, resourceId, date) : new WriteRequest(contentFormat, objectId, instanceId, resourceId, date); case OPAQUE: // byte[] value, base64 return (contentFormat == null) ? new WriteRequest(objectId, instanceId, resourceId, Hex.decodeHex(value.toString().toCharArray())) : new WriteRequest(contentFormat, objectId, instanceId, resourceId, Hex.decodeHex(value.toString().toCharArray())); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java index f8d2716808..96da8721c8 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java @@ -146,9 +146,9 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { * Next -> Create new LwM2MClient for current session -> setModelClient... * * @param registration - Registration LwM2M Client - * @param previousObsersations - may be null + * @param previousObservations - may be null */ - public void onRegistered(Registration registration, Collection previousObsersations) { + public void onRegistered(Registration registration, Collection previousObservations) { executorRegistered.submit(() -> { try { log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId()); @@ -188,6 +188,13 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { LwM2mClient lwM2MClient = this.lwM2mClientContext.getLwM2MClient(sessionInfo); if (lwM2MClient.getDeviceId() == null && lwM2MClient.getProfileId() == null) { initLwM2mClient(lwM2MClient, sessionInfo); + } else { + if (registration.getBindingMode().useQueueMode()) { + LwM2mQueuedRequest request; + while ((request = lwM2MClient.getQueuedRequests().poll()) != null) { + request.send(); + } + } } log.info("Client: [{}] updatedReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType()); @@ -206,7 +213,7 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { * !!! Warn: if have not finishing unReg, then this operation will be finished on next Client`s connect */ public void unReg(Registration registration, Collection observations) { - executorUnRegistered.submit(() -> { + executorUnRegistered.submit(() -> { try { this.setCancelObservations(registration); this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client unRegistration", registration); @@ -239,8 +246,11 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { } } + @Override public void onSleepingDev(Registration registration) { log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint()); + this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client is sleeping!", registration); + //TODO: associate endpointId with device information. } @@ -417,6 +427,7 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { */ protected void onAwakeDev(Registration registration) { log.info("[{}] [{}] Received endpoint Awake version event", registration.getId(), registration.getEndpoint()); + this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client is awake!", registration); //TODO: associate endpointId with device information. } @@ -612,7 +623,7 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { if (GET_TYPE_OPER_READ.equals(typeOper)) { result = JacksonUtil.fromString(lwM2MClientProfile.getPostAttributeProfile().toString(), new TypeReference<>() { }); - result.addAll(JacksonUtil.fromString(lwM2MClientProfile.getPostTelemetryProfile().toString(), new TypeReference<>() { + result.addAll(JacksonUtil.convertValue(lwM2MClientProfile.getPostTelemetryProfile().toString(), new TypeReference<>() { })); } else { result = JacksonUtil.fromString(lwM2MClientProfile.getPostObserveProfile().toString(), new TypeReference<>() { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java index bdb783e3ff..59786e37c6 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java @@ -25,13 +25,16 @@ import org.eclipse.leshan.server.registration.Registration; import org.eclipse.leshan.server.security.SecurityInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.transport.lwm2m.server.LwM2mQueuedRequest; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServiceImpl; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -54,6 +57,7 @@ public class LwM2mClient implements Cloneable { private final Map resources; private final Map delayedRequests; private final List pendingRequests; + private final Queue queuedRequests; private boolean init; public Object clone() throws CloneNotSupportedException { @@ -71,6 +75,7 @@ public class LwM2mClient implements Cloneable { this.profileId = profileId; this.sessionId = sessionId; this.init = false; + this.queuedRequests = new ConcurrentLinkedQueue<>(); } public boolean saveResourceValue(String pathRez, LwM2mResource rez, LwM2mModelProvider modelProvider) {