Browse Source

Merge pull request #4368 from YevhenBondarenko/develop/3.3-lwm2m-queue-mode

lwm2m queue mode
pull/4416/head
Igor Kulikov 5 years ago
committed by GitHub
parent
commit
168b8d86eb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java
  2. 20
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mQueuedRequest.java
  3. 21
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportRequest.java
  4. 19
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java
  5. 5
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java

7
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java

@ -40,7 +40,7 @@ import org.thingsboard.server.transport.lwm2m.utils.TypeServer;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@ -70,8 +70,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
@Override
public List<SecurityInfo> getAllByEndpoint(String endPoint) {
String endPointKey = endPoint;
ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(endPointKey, TypeServer.BOOTSTRAP);
ReadResultSecurityStore store = lwM2MCredentialsSecurityInfoValidator.createAndValidateCredentialsSecurityInfo(endPoint, TypeServer.BOOTSTRAP);
if (store.getBootstrapJsonCredential() != null && store.getSecurityMode() < LwM2MSecurityMode.DEFAULT_MODE.code) {
/** add value to store from BootstrapJson */
this.setBootstrapConfigScurityInfo(store);
@ -87,7 +86,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
} catch (InvalidConfigurationException e) {
log.error("", e);
}
return store.getSecurityInfo() == null ? null : Arrays.asList(store.getSecurityInfo());
return store.getSecurityInfo() == null ? null : Collections.singletonList(store.getSecurityInfo());
}
}
return null;

20
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mQueuedRequest.java

@ -0,0 +1,20 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server;
public interface LwM2mQueuedRequest {
void send();
}

21
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportRequest.java

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.lwm2m.server;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.leshan.core.attributes.Attribute;
import org.eclipse.leshan.core.attributes.AttributeSet;
@ -34,6 +35,7 @@ import org.eclipse.leshan.core.request.ObserveRequest;
import org.eclipse.leshan.core.request.ReadRequest;
import org.eclipse.leshan.core.request.WriteAttributesRequest;
import org.eclipse.leshan.core.request.WriteRequest;
import org.eclipse.leshan.core.request.exception.ClientSleepingException;
import org.eclipse.leshan.core.response.CancelObservationResponse;
import org.eclipse.leshan.core.response.DeleteResponse;
import org.eclipse.leshan.core.response.DiscoverResponse;
@ -58,7 +60,6 @@ import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.eclipse.californium.core.coap.CoAP.ResponseCode.isSuccess;
import static org.eclipse.leshan.core.attributes.Attribute.MINIMUM_PERIOD;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportHandler.DEFAULT_TIMEOUT;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportHandler.GET_TYPE_OPER_DISCOVER;
@ -215,13 +216,19 @@ public class LwM2mTransportRequest {
request = new WriteAttributesRequest(resultIds.getObjectId(), attrSet);
}
break;
default:
}
if (request != null) {
this.sendRequest(registration, lwM2MClient, request, timeoutInMs);
}
else {
try {
this.sendRequest(registration, lwM2MClient, request, timeoutInMs);
} catch (ClientSleepingException e) {
DownlinkRequest finalRequest = request;
long finalTimeoutInMs = timeoutInMs;
lwM2MClient.getQueuedRequests().add(() -> sendRequest(registration, lwM2MClient, finalRequest, finalTimeoutInMs));
} catch (Exception e) {
log.error("[{}] [{}] [{}] Failed to send downlink.", registration.getEndpoint(), targetIdVer, typeOper, e);
}
} else {
log.error("[{}], [{}] - [{}] error SendRequest", registration.getEndpoint(), typeOper, targetIdVer);
}
}
@ -240,7 +247,7 @@ public class LwM2mTransportRequest {
if (!lwM2MClient.isInit()) {
lwM2MClient.initValue(this.serviceImpl, convertToIdVerFromObjectId(request.getPath().toString(), registration));
}
if (isSuccess(((Response) response.getCoapResponse()).getCode())) {
if (CoAP.ResponseCode.isSuccess(((Response) response.getCoapResponse()).getCode())) {
this.handleResponse(registration, request.getPath().toString(), response, request);
if (request instanceof WriteRequest && ((WriteRequest) request).isReplaceRequest()) {
String msg = String.format("%s: sendRequest Replace: CoapCde - %s Lwm2m code - %d name - %s Resource path - %s value - %s SendRequest to Client",
@ -283,7 +290,7 @@ public class LwM2mTransportRequest {
case FLOAT: // Double
return (contentFormat == null) ? new WriteRequest(objectId, instanceId, resourceId, Double.parseDouble(value.toString())) : new WriteRequest(contentFormat, objectId, instanceId, resourceId, Double.parseDouble(value.toString()));
case TIME: // Date
Date date = new Date(Long.decode(value.toString()));
Date date = new Date(Long.decode(value.toString()));
return (contentFormat == null) ? new WriteRequest(objectId, instanceId, resourceId, date) : new WriteRequest(contentFormat, objectId, instanceId, resourceId, date);
case OPAQUE: // byte[] value, base64
return (contentFormat == null) ? new WriteRequest(objectId, instanceId, resourceId, Hex.decodeHex(value.toString().toCharArray())) : new WriteRequest(contentFormat, objectId, instanceId, resourceId, Hex.decodeHex(value.toString().toCharArray()));

19
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java

@ -146,9 +146,9 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
* Next -> Create new LwM2MClient for current session -> setModelClient...
*
* @param registration - Registration LwM2M Client
* @param previousObsersations - may be null
* @param previousObservations - may be null
*/
public void onRegistered(Registration registration, Collection<Observation> previousObsersations) {
public void onRegistered(Registration registration, Collection<Observation> previousObservations) {
executorRegistered.submit(() -> {
try {
log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId());
@ -188,6 +188,13 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
LwM2mClient lwM2MClient = this.lwM2mClientContext.getLwM2MClient(sessionInfo);
if (lwM2MClient.getDeviceId() == null && lwM2MClient.getProfileId() == null) {
initLwM2mClient(lwM2MClient, sessionInfo);
} else {
if (registration.getBindingMode().useQueueMode()) {
LwM2mQueuedRequest request;
while ((request = lwM2MClient.getQueuedRequests().poll()) != null) {
request.send();
}
}
}
log.info("Client: [{}] updatedReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
@ -206,7 +213,7 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
* !!! Warn: if have not finishing unReg, then this operation will be finished on next Client`s connect
*/
public void unReg(Registration registration, Collection<Observation> observations) {
executorUnRegistered.submit(() -> {
executorUnRegistered.submit(() -> {
try {
this.setCancelObservations(registration);
this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client unRegistration", registration);
@ -239,8 +246,11 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
}
}
@Override
public void onSleepingDev(Registration registration) {
log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint());
this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client is sleeping!", registration);
//TODO: associate endpointId with device information.
}
@ -417,6 +427,7 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
*/
protected void onAwakeDev(Registration registration) {
log.info("[{}] [{}] Received endpoint Awake version event", registration.getId(), registration.getEndpoint());
this.sendLogsToThingsboard(LOG_LW2M_INFO + ": Client is awake!", registration);
//TODO: associate endpointId with device information.
}
@ -612,7 +623,7 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService {
if (GET_TYPE_OPER_READ.equals(typeOper)) {
result = JacksonUtil.fromString(lwM2MClientProfile.getPostAttributeProfile().toString(), new TypeReference<>() {
});
result.addAll(JacksonUtil.fromString(lwM2MClientProfile.getPostTelemetryProfile().toString(), new TypeReference<>() {
result.addAll(JacksonUtil.convertValue(lwM2MClientProfile.getPostTelemetryProfile().toString(), new TypeReference<>() {
}));
} else {
result = JacksonUtil.fromString(lwM2MClientProfile.getPostObserveProfile().toString(), new TypeReference<>() {

5
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java

@ -25,13 +25,16 @@ import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.security.SecurityInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.transport.lwm2m.server.LwM2mQueuedRequest;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServiceImpl;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@ -54,6 +57,7 @@ public class LwM2mClient implements Cloneable {
private final Map<String, ResourceValue> resources;
private final Map<String, TransportProtos.TsKvProto> delayedRequests;
private final List<String> pendingRequests;
private final Queue<LwM2mQueuedRequest> queuedRequests;
private boolean init;
public Object clone() throws CloneNotSupportedException {
@ -71,6 +75,7 @@ public class LwM2mClient implements Cloneable {
this.profileId = profileId;
this.sessionId = sessionId;
this.init = false;
this.queuedRequests = new ConcurrentLinkedQueue<>();
}
public boolean saveResourceValue(String pathRez, LwM2mResource rez, LwM2mModelProvider modelProvider) {

Loading…
Cancel
Save