From bed454ec0fcace61736ca58129d1b74646fdffba Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 17 Jun 2021 18:29:33 +0300 Subject: [PATCH] LwM2M Refactoring of OTA --- .../src/main/resources/thingsboard.yml | 7 +- .../transport/TransportSqlTestSuite.java | 14 +- .../config/LwM2MTransportServerConfig.java | 20 +- .../server/DefaultLwM2mTransportService.java | 5 +- .../lwm2m/server/LwM2mNetworkConfig.java | 6 +- .../server/LwM2mTransportCoapResource.java | 9 +- .../lwm2m/server/LwM2mTransportUtil.java | 46 +- .../lwm2m/server/UpdateResultFw.java | 13 +- .../DefaultLwM2MAttributesService.java | 14 +- .../lwm2m/server/client/LwM2mClient.java | 22 - .../lwm2m/server/client/LwM2mFwSwUpdate.java | 486 ------------------ .../common/LwM2MExecutorAwareService.java | 41 ++ .../AbstractTbLwM2MRequestCallback.java | 12 +- .../DefaultLwM2mDownlinkMsgHandler.java | 62 ++- .../TbLwM2MCancelAllObserveCallback.java | 8 +- .../TbLwM2MCancelObserveCallback.java | 7 +- .../downlink/TbLwM2MDeleteCallback.java | 5 +- .../downlink/TbLwM2MDiscoverCallback.java | 5 +- .../downlink/TbLwM2MExecuteCallback.java | 5 +- .../downlink/TbLwM2MObserveCallback.java | 7 +- .../server/downlink/TbLwM2MReadCallback.java | 7 +- .../downlink/TbLwM2MTargetedCallback.java | 7 +- .../TbLwM2MUplinkTargetedCallback.java | 35 ++ .../TbLwM2MWriteAttributesCallback.java | 5 +- .../TbLwM2MWriteResponseCallback.java | 7 +- .../log/DefaultLwM2MTelemetryLogService.java | 56 ++ .../server/log/LwM2MTelemetryLogService.java | 26 + .../ota/DefaultLwM2MOtaUpdateService.java | 163 ++++-- .../lwm2m/server/ota/LwM2MClientOtaInfo.java | 23 +- .../server/ota/LwM2MOtaUpdateService.java | 6 +- .../rpc/DefaultLwM2MRpcRequestHandler.java | 22 +- .../uplink/DefaultLwM2MUplinkMsgHandler.java | 197 ++----- .../server/uplink/LwM2mUplinkMsgHandler.java | 4 - .../src/main/resources/tb-lwm2m-transport.yml | 8 +- 34 files changed, 479 insertions(+), 881 deletions(-) delete mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mFwSwUpdate.java create mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/common/LwM2MExecutorAwareService.java create mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MUplinkTargetedCallback.java create mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java create mode 100644 common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/LwM2MTelemetryLogService.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 54721ba2fa..fcde169fc9 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -677,12 +677,11 @@ transport: timeout: "${LWM2M_TIMEOUT:120000}" recommended_ciphers: "${LWM2M_RECOMMENDED_CIPHERS:false}" recommended_supported_groups: "${LWM2M_RECOMMENDED_SUPPORTED_GROUPS:true}" - response_pool_size: "${LWM2M_RESPONSE_POOL_SIZE:100}" - registered_pool_size: "${LWM2M_REGISTERED_POOL_SIZE:10}" + uplink_pool_size: "${LWM2M_UPLINK_POOL_SIZE:10}" + downlink_pool_size: "${LWM2M_DOWNLINK_POOL_SIZE:10}" + ota_pool_size: "${LWM2M_OTA_POOL_SIZE:10}" registration_store_pool_size: "${LWM2M_REGISTRATION_STORE_POOL_SIZE:100}" clean_period_in_sec: "${LWM2M_CLEAN_PERIOD_IN_SEC:2}" - update_registered_pool_size: "${LWM2M_UPDATE_REGISTERED_POOL_SIZE:10}" - un_registered_pool_size: "${LWM2M_UN_REGISTERED_POOL_SIZE:10}" log_max_length: "${LWM2M_LOG_MAX_LENGTH:100}" # Use redis for Security and Registration stores redis.enabled: "${LWM2M_REDIS_ENABLED:false}" diff --git a/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java index d059ea1449..25df3bee00 100644 --- a/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/transport/TransportSqlTestSuite.java @@ -26,13 +26,13 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ -// "org.thingsboard.server.transport.*.rpc.sql.*Test", -// "org.thingsboard.server.transport.*.telemetry.timeseries.sql.*Test", -// "org.thingsboard.server.transport.*.telemetry.attributes.sql.*Test", -// "org.thingsboard.server.transport.*.attributes.updates.sql.*Test", -// "org.thingsboard.server.transport.*.attributes.request.sql.*Test", -// "org.thingsboard.server.transport.*.claim.sql.*Test", -// "org.thingsboard.server.transport.*.provision.sql.*Test", + "org.thingsboard.server.transport.*.rpc.sql.*Test", + "org.thingsboard.server.transport.*.telemetry.timeseries.sql.*Test", + "org.thingsboard.server.transport.*.telemetry.attributes.sql.*Test", + "org.thingsboard.server.transport.*.attributes.updates.sql.*Test", + "org.thingsboard.server.transport.*.attributes.request.sql.*Test", + "org.thingsboard.server.transport.*.claim.sql.*Test", + "org.thingsboard.server.transport.*.provision.sql.*Test", "org.thingsboard.server.transport.lwm2m.*Test" }) public class TransportSqlTestSuite { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java index 25c7766895..a2ff361712 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/config/LwM2MTransportServerConfig.java @@ -57,29 +57,21 @@ public class LwM2MTransportServerConfig implements LwM2MSecureServerConfig { private boolean recommendedSupportedGroups; @Getter - @Value("${transport.lwm2m.response_pool_size:}") - private int responsePoolSize; + @Value("${transport.lwm2m.downlink_pool_size:}") + private int downlinkPoolSize; @Getter - @Value("${transport.lwm2m.registered_pool_size:}") - private int registeredPoolSize; + @Value("${transport.lwm2m.uplink_pool_size:}") + private int uplinkPoolSize; @Getter - @Value("${transport.lwm2m.registration_store_pool_size:}") - private int registrationStorePoolSize; + @Value("${transport.lwm2m.ota_pool_size:}") + private int otaPoolSize; @Getter @Value("${transport.lwm2m.clean_period_in_sec:}") private int cleanPeriodInSec; - @Getter - @Value("${transport.lwm2m.update_registered_pool_size:}") - private int updateRegisteredPoolSize; - - @Getter - @Value("${transport.lwm2m.un_registered_pool_size:}") - private int unRegisteredPoolSize; - @Getter @Value("${transport.lwm2m.security.key_store_type:}") private String keyStoreType; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java index 97ab82f802..9425ef7891 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java @@ -27,6 +27,7 @@ import org.eclipse.leshan.server.californium.LeshanServerBuilder; import org.eclipse.leshan.server.californium.registration.CaliforniumRegistrationStore; import org.eclipse.leshan.server.model.LwM2mModelProvider; import org.springframework.stereotype.Component; +import org.thingsboard.server.cache.ota.OtaPackageDataCache; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; @@ -80,6 +81,7 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService { private final LwM2mTransportContext context; private final LwM2MTransportServerConfig config; private final LwM2mTransportServerHelper helper; + private final OtaPackageDataCache otaPackageDataCache; private final DefaultLwM2MUplinkMsgHandler handler; private final CaliforniumRegistrationStore registrationStore; private final TbSecurityStore securityStore; @@ -103,8 +105,7 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService { * "coap://host:port/{path}/{token}/{nameFile}" */ - - LwM2mTransportCoapResource otaCoapResource = new LwM2mTransportCoapResource(handler, FIRMWARE_UPDATE_COAP_RECOURSE); + LwM2mTransportCoapResource otaCoapResource = new LwM2mTransportCoapResource(otaPackageDataCache, FIRMWARE_UPDATE_COAP_RECOURSE); this.server.coap().getServer().add(otaCoapResource); this.startLhServer(); this.context.setServer(server); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mNetworkConfig.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mNetworkConfig.java index 00f70dbcf7..4fc60aecfc 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mNetworkConfig.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mNetworkConfig.java @@ -84,10 +84,10 @@ public class LwM2mNetworkConfig { Create new instance of udp endpoint context matcher. Params: checkAddress - – true with address check, (STRICT, UDP) + – true with address check, (STRICT, UDP) - if port Registration of client is changed - it is bad - false, without */ - coapConfig.setString(NetworkConfig.Keys.RESPONSE_MATCHING, "STRICT"); + coapConfig.setString(NetworkConfig.Keys.RESPONSE_MATCHING, "RELAXED"); /** https://tools.ietf.org/html/rfc7959#section-2.9.3 The block size (number of bytes) to use when doing a blockwise transfer. \ @@ -103,7 +103,7 @@ public class LwM2mNetworkConfig { */ coapConfig.setInt(NetworkConfig.Keys.MAX_MESSAGE_SIZE, 1024); - coapConfig.setInt(NetworkConfig.Keys.MAX_RETRANSMIT, 4); + coapConfig.setInt(NetworkConfig.Keys.MAX_RETRANSMIT, 10); return coapConfig; } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java index ff86ee9bd9..229ec1a08c 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportCoapResource.java @@ -24,6 +24,7 @@ import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.core.server.resources.Resource; import org.eclipse.californium.core.server.resources.ResourceObserver; +import org.thingsboard.server.cache.ota.OtaPackageDataCache; import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2MUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; @@ -39,11 +40,11 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.S public class LwM2mTransportCoapResource extends AbstractLwM2mTransportResource { private final ConcurrentMap tokenToObserveRelationMap = new ConcurrentHashMap<>(); private final ConcurrentMap tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>(); - private final LwM2mUplinkMsgHandler handler; + private final OtaPackageDataCache otaPackageDataCache; - public LwM2mTransportCoapResource(LwM2mUplinkMsgHandler handler, String name) { + public LwM2mTransportCoapResource(OtaPackageDataCache otaPackageDataCache, String name) { super(name); - this.handler = handler; + this.otaPackageDataCache = otaPackageDataCache; this.setObservable(true); // enable observing this.addObserver(new CoapResourceObserver()); } @@ -154,7 +155,7 @@ public class LwM2mTransportCoapResource extends AbstractLwM2mTransportResource { } private byte[] getOtaData(UUID currentId) { - return ((DefaultLwM2MUplinkMsgHandler) handler).otaPackageDataCache.get(currentId.toString()); + return otaPackageDataCache.get(currentId.toString()); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java index f310eb6481..d04e4993f1 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportUtil.java @@ -201,36 +201,12 @@ public class LwM2mTransportUtil { } } - /** - * FirmwareUpdateStatus { - * DOWNLOADING, DOWNLOADED, VERIFIED, UPDATING, UPDATED, FAILED - */ - public static OtaPackageUpdateStatus equalsFwSateFwResultToFirmwareUpdateStatus(UpdateStateFw updateStateFw, UpdateResultFw updateResultFw) { - switch (updateResultFw) { - case INITIAL: - return toOtaPackageUpdateStatus(updateStateFw); - case UPDATE_SUCCESSFULLY: - return UPDATED; - case NOT_ENOUGH: - case OUT_OFF_MEMORY: - case CONNECTION_LOST: - case INTEGRITY_CHECK_FAILURE: - case UNSUPPORTED_TYPE: - case INVALID_URI: - case UPDATE_FAILED: - case UNSUPPORTED_PROTOCOL: - return FAILED; - default: - throw new CodecException("Invalid value stateFw %s %s for FirmwareUpdateStatus.", updateStateFw.name(), updateResultFw.name()); - } - } - - public static OtaPackageUpdateStatus toOtaPackageUpdateStatus(UpdateResultFw updateResultFw) { + public static Optional toOtaPackageUpdateStatus(UpdateResultFw updateResultFw) { switch (updateResultFw) { case INITIAL: - return VERIFIED; + return Optional.empty(); case UPDATE_SUCCESSFULLY: - return UPDATED; + return Optional.of(UPDATED); case NOT_ENOUGH: case OUT_OFF_MEMORY: case CONNECTION_LOST: @@ -239,22 +215,22 @@ public class LwM2mTransportUtil { case INVALID_URI: case UPDATE_FAILED: case UNSUPPORTED_PROTOCOL: - return FAILED; + return Optional.of(FAILED); default: throw new CodecException("Invalid value stateFw %s for FirmwareUpdateStatus.", updateResultFw.name()); } } - public static OtaPackageUpdateStatus toOtaPackageUpdateStatus(UpdateStateFw updateStateFw) { + public static Optional toOtaPackageUpdateStatus(UpdateStateFw updateStateFw) { switch (updateStateFw) { case IDLE: - return VERIFIED; + return Optional.empty(); case DOWNLOADING: - return DOWNLOADING; + return Optional.of(DOWNLOADING); case DOWNLOADED: - return DOWNLOADED; + return Optional.of(DOWNLOADED); case UPDATING: - return UPDATING; + return Optional.of(UPDATING); default: throw new CodecException("Invalid value stateFw %d for FirmwareUpdateStatus.", updateStateFw); } @@ -471,7 +447,7 @@ public class LwM2mTransportUtil { return lwM2mOtaConvert; } else if (FW_RESULT_ID.equals(path)) { lwM2mOtaConvert.setCurrentType(STRING); - lwM2mOtaConvert.setValue(UpdateResultFw.fromUpdateResultFwByCode(((Long) value).intValue()).type); + lwM2mOtaConvert.setValue(UpdateResultFw.fromUpdateResultFwByCode(((Long) value).intValue()).getType()); return lwM2mOtaConvert; } else if (SW_UPDATE_STATE_ID.equals(path)) { lwM2mOtaConvert.setCurrentType(STRING); @@ -585,7 +561,7 @@ public class LwM2mTransportUtil { public static String fromVersionedIdToObjectId(String pathIdVer) { try { - if(pathIdVer == null) { + if (pathIdVer == null) { return null; } String[] keyArray = pathIdVer.split(LWM2M_SEPARATOR_PATH); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/UpdateResultFw.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/UpdateResultFw.java index 1b2e7d6b26..79361aeb1d 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/UpdateResultFw.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/UpdateResultFw.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.transport.lwm2m.server; +import lombok.Getter; + /** * FW Update Result * 0: Initial value. Once the updating process is initiated (Download /Update), this Resource MUST be reset to Initial value. @@ -40,14 +42,17 @@ public enum UpdateResultFw { UPDATE_FAILED(8, "Firmware update failed", false), UNSUPPORTED_PROTOCOL(9, "Unsupported protocol", false); - public int code; - public String type; - public boolean isAgain; + @Getter + private int code; + @Getter + private String type; + @Getter + private boolean again; UpdateResultFw(int code, String type, boolean isAgain) { this.code = code; this.type = type; - this.isAgain = isAgain; + this.again = isAgain; } public static UpdateResultFw fromUpdateResultFwByType(String type) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java index ed717f3eee..ee723dbaa6 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/attributes/DefaultLwM2MAttributesService.java @@ -39,6 +39,7 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteReplaceRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteResponseCallback; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.ota.DefaultLwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; @@ -71,6 +72,7 @@ public class DefaultLwM2MAttributesService implements LwM2MAttributesService { private final LwM2MTransportServerConfig config; private final LwM2mUplinkMsgHandler uplinkHandler; private final LwM2mDownlinkMsgHandler downlinkHandler; + private final LwM2MTelemetryLogService logService; private final LwM2MOtaUpdateService otaUpdateService; @Override @@ -150,14 +152,6 @@ public class DefaultLwM2MAttributesService implements LwM2MAttributesService { if (!otherAttributes.isEmpty()) { onAttributesUpdate(lwM2MClient, otherAttributes); } - } else if (msg.getSharedDeletedCount() > 0 && lwM2MClient != null) { - msg.getSharedUpdatedList().forEach(tsKvProto -> { - String pathName = tsKvProto.getKv().getKey(); - Object valueNew = getValueFromKvProto(tsKvProto.getKv()); - if (OtaPackageUtil.getAttributeKey(OtaPackageType.FIRMWARE, OtaPackageKey.VERSION).equals(pathName) && !valueNew.equals(lwM2MClient.getFwUpdate().getCurrentVersion())) { - lwM2MClient.getFwUpdate().setCurrentVersion((String) valueNew); - } - }); } else if (lwM2MClient == null) { log.error("OnAttributeUpdate, lwM2MClient is null"); } @@ -197,12 +191,12 @@ public class DefaultLwM2MAttributesService implements LwM2MAttributesService { private void pushUpdateToClientIfNeeded(LwM2mClient lwM2MClient, Object valueOld, Object newValue, String versionedId) { if (newValue != null && (valueOld == null || !newValue.toString().equals(valueOld.toString()))) { TbLwM2MWriteReplaceRequest request = TbLwM2MWriteReplaceRequest.builder().versionedId(versionedId).value(newValue).timeout(this.config.getTimeout()).build(); - downlinkHandler.sendWriteReplaceRequest(lwM2MClient, request, new TbLwM2MWriteResponseCallback(uplinkHandler, lwM2MClient, versionedId)); + downlinkHandler.sendWriteReplaceRequest(lwM2MClient, request, new TbLwM2MWriteResponseCallback(uplinkHandler, logService, lwM2MClient, versionedId)); } else { log.error("Failed update resource [{}] [{}]", versionedId, newValue); String logMsg = String.format("%s: Failed update resource versionedId - %s value - %s. Value is not changed or bad", LOG_LWM2M_ERROR, versionedId, newValue); - uplinkHandler.logToTelemetry(lwM2MClient, logMsg); + logService.log(lwM2MClient, logMsg); log.info("Failed update resource [{}] [{}]", versionedId, newValue); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java index 21d6ec3d48..0e71a1e865 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java @@ -95,12 +95,6 @@ public class LwM2mClient implements Cloneable { private UUID profileId; @Getter @Setter - private volatile LwM2mFwSwUpdate fwUpdate; - @Getter - @Setter - private volatile LwM2mFwSwUpdate swUpdate; - @Getter - @Setter private Registration registration; private ValidateDeviceCredentialsResponse credentials; @@ -365,21 +359,5 @@ public class LwM2mClient implements Cloneable { } } - public LwM2mFwSwUpdate getFwUpdate(LwM2mUplinkMsgHandler handler, LwM2mClientContext clientContext) { - if (this.fwUpdate == null) { - var profile = clientContext.getProfile(this.getProfileId()); - this.fwUpdate = new LwM2mFwSwUpdate(handler, this, OtaPackageType.FIRMWARE, profile.getClientLwM2mSettings().getFwUpdateStrategy()); - } - return this.fwUpdate; - } - - public LwM2mFwSwUpdate getSwUpdate(LwM2mUplinkMsgHandler handler, LwM2mClientContext clientContext) { - if (this.swUpdate == null) { - var profile = clientContext.getProfile(this.getProfileId()); - this.swUpdate = new LwM2mFwSwUpdate(handler, this, OtaPackageType.SOFTWARE, profile.getClientLwM2mSettings().getSwUpdateStrategy()); - } - return this.fwUpdate; - } - } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mFwSwUpdate.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mFwSwUpdate.java deleted file mode 100644 index b39f6084cd..0000000000 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mFwSwUpdate.java +++ /dev/null @@ -1,486 +0,0 @@ -/** - * Copyright © 2016-2021 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.lwm2m.server.client; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.eclipse.leshan.server.registration.Registration; -import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.transport.lwm2m.server.LwM2MFirmwareUpdateStrategy; -import org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType; -import org.thingsboard.server.transport.lwm2m.server.UpdateStateFw; -import org.thingsboard.server.transport.lwm2m.server.UpdateResultFw; -import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2MUplinkMsgHandler; -import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; -import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MExecuteRequest; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MExecuteCallback; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveRequest; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveCallback; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteResponseCallback; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteReplaceRequest; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; - -import static org.thingsboard.server.common.data.ota.OtaPackageKey.STATE; -import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE; -import static org.thingsboard.server.common.data.ota.OtaPackageType.SOFTWARE; -import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.DOWNLOADED; -import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.FAILED; -import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.INITIATED; -import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.UPDATED; -import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.UPDATING; -import static org.thingsboard.server.common.data.ota.OtaPackageUtil.getAttributeKey; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FIRMWARE_UPDATE_COAP_RECOURSE; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_3_VER_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_5_VER_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_NAME_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_PACKAGE_19_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_PACKAGE_5_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_PACKAGE_URI_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_RESULT_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_STATE_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_UPDATE; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_UPDATE_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_ERROR; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; -import static org.thingsboard.server.transport.lwm2m.server.LwM2MFirmwareUpdateStrategy.OBJ_19_BINARY; -import static org.thingsboard.server.transport.lwm2m.server.LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY; -import static org.thingsboard.server.transport.lwm2m.server.LwM2MFirmwareUpdateStrategy.OBJ_5_TEMP_URL; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType.EXECUTE; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mOperationType.WRITE_REPLACE; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_INSTALL_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_NAME_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_PACKAGE_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_RESULT_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_UN_INSTALL_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_UPDATE; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_UPDATE_STATE_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_VER_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertObjectIdToVersionedId; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.toOtaPackageUpdateStatus; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.splitCamelCaseString; - -@Slf4j -public class LwM2mFwSwUpdate { - // 5/0/6 PkgName - // 9/0/0 PkgName - @Getter - @Setter - private volatile String currentTitle; - // 5/0/7 PkgVersion - // 9/0/1 PkgVersion - @Getter - @Setter - private volatile String currentVersion; - @Getter - @Setter - private volatile UUID currentId; - @Getter - @Setter - private volatile String stateUpdate; - @Getter - private String pathPackageId; - @Getter - private String pathStateId; - @Getter - private String pathResultId; - @Getter - private String pathNameId; - @Getter - private String pathVerId; - @Getter - private String pathInstallId; - @Getter - private String pathUnInstallId; - @Getter - private String wUpdate; - @Getter - @Setter - private volatile boolean infoFwSwUpdate = false; - private final OtaPackageType type; - - private final LwM2mUplinkMsgHandler handler; - - @Getter - LwM2mClient lwM2MClient; - @Getter - private final List pendingInfoRequestsStart; - @Getter - @Setter - private volatile int updateStrategy; - - public LwM2mFwSwUpdate(LwM2mUplinkMsgHandler handler, LwM2mClient lwM2MClient, OtaPackageType type, int updateStrategy) { - this.handler = handler; - this.lwM2MClient = lwM2MClient; - this.pendingInfoRequestsStart = new CopyOnWriteArrayList<>(); - this.type = type; - this.stateUpdate = null; - this.updateStrategy = updateStrategy; - this.initPathId(); - } - - private void initPathId() { - if (FIRMWARE.equals(this.type)) { - this.pathPackageId = LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY.code == this.updateStrategy ? - FW_PACKAGE_5_ID : LwM2MFirmwareUpdateStrategy.OBJ_5_TEMP_URL.code == this.updateStrategy ? - FW_PACKAGE_URI_ID : FW_PACKAGE_19_ID; - this.pathStateId = FW_STATE_ID; - this.pathResultId = FW_RESULT_ID; - this.pathNameId = FW_NAME_ID; - this.pathVerId = FW_5_VER_ID; - this.pathInstallId = FW_UPDATE_ID; - this.wUpdate = FW_UPDATE; - } else if (SOFTWARE.equals(this.type)) { - this.pathPackageId = SW_PACKAGE_ID; - this.pathStateId = SW_UPDATE_STATE_ID; - this.pathResultId = SW_RESULT_ID; - this.pathNameId = SW_NAME_ID; - this.pathVerId = SW_VER_ID; - this.pathInstallId = SW_INSTALL_ID; - this.pathUnInstallId = SW_UN_INSTALL_ID; - this.wUpdate = SW_UPDATE; - } - } - - public void initReadValue(DefaultLwM2MUplinkMsgHandler handler, LwM2mDownlinkMsgHandler request, String pathIdVer) { - if (pathIdVer != null) { - this.pendingInfoRequestsStart.remove(pathIdVer); - } - if (this.pendingInfoRequestsStart.size() == 0) { - this.infoFwSwUpdate = false; -// if (!FAILED.name().equals(this.stateUpdate)) { - boolean conditionalStart = this.type.equals(FIRMWARE) ? this.conditionalFwUpdateStart(handler) : - this.conditionalSwUpdateStart(handler); - if (conditionalStart) { - this.writeFwSwWare(handler, request); - } -// } - } - } - - /** - * Send FsSw to Lwm2mClient: - * before operation Write: fw_state = DOWNLOADING - */ - public void writeFwSwWare(DefaultLwM2MUplinkMsgHandler handler, LwM2mDownlinkMsgHandler request) { - if (this.currentId != null) { - this.stateUpdate = OtaPackageUpdateStatus.INITIATED.name(); - this.sendLogs(handler, WRITE_REPLACE.name(), LOG_LWM2M_INFO, null); - String targetIdVer = convertObjectIdToVersionedId(this.pathPackageId, this.lwM2MClient.getRegistration()); - String fwMsg = String.format("%s: Start type operation %s paths: %s", LOG_LWM2M_INFO, - LwM2mOperationType.FW_UPDATE.name(), this.pathPackageId); - handler.logToTelemetry(fwMsg, lwM2MClient.getRegistration().getId()); - log.warn("8) Start firmware Update. Send save to: [{}] ver: [{}] path: [{}]", this.lwM2MClient.getDeviceName(), this.currentVersion, targetIdVer); - if (LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY.code == this.updateStrategy) { - int chunkSize = 0; - int chunk = 0; - byte[] firmwareChunk = handler.otaPackageDataCache.get(this.currentId.toString(), chunkSize, chunk); - - TbLwM2MWriteReplaceRequest downlink = TbLwM2MWriteReplaceRequest.builder().versionedId(targetIdVer).value(firmwareChunk).timeout(handler.config.getTimeout()).build(); - request.sendWriteReplaceRequest(lwM2MClient, downlink, new TbLwM2MWriteResponseCallback(handler, lwM2MClient, targetIdVer)); - } else if (LwM2MFirmwareUpdateStrategy.OBJ_5_TEMP_URL.code == this.updateStrategy) { - String apiFont = "coap://176.36.143.9:5685"; - String uri = apiFont + "/" + FIRMWARE_UPDATE_COAP_RECOURSE + "/" + this.currentId.toString(); - log.warn("89) coapUri: [{}]", uri); - //TODO: user this.rpcRequest??? - TbLwM2MWriteReplaceRequest downlink = TbLwM2MWriteReplaceRequest.builder().versionedId(targetIdVer).value(uri).timeout(handler.config.getTimeout()).build(); - request.sendWriteReplaceRequest(lwM2MClient, downlink, new TbLwM2MWriteResponseCallback(handler, lwM2MClient, targetIdVer)); - } else if (LwM2MFirmwareUpdateStrategy.OBJ_19_BINARY.code == this.updateStrategy) { - - } - } else { - String msgError = "FirmWareId is null."; - log.warn("6) [{}]", msgError); -// if (this.rpcRequest != null) { -// TODO: refactor. -// handler.sentRpcResponse(this.rpcRequest, CONTENT.name(), msgError, LOG_LW2M_ERROR); -// } - log.error(msgError); - this.sendLogs(handler, WRITE_REPLACE.name(), LOG_LWM2M_ERROR, msgError); - } - } - - public void sendLogs(DefaultLwM2MUplinkMsgHandler handler, String typeOper, String typeInfo, String msgError) { -// this.sendSateOnThingsBoard(handler); - String msg = String.format("%s: %s, %s, pkgVer: %s: pkgName - %s state - %s.", - typeInfo, this.wUpdate, typeOper, this.currentVersion, this.currentTitle, this.stateUpdate); - if (LOG_LWM2M_ERROR.equals(typeInfo)) { - msg = String.format("%s Error: %s", msg, msgError); - } - handler.logToTelemetry(lwM2MClient, msg); - } - - - /** - * After inspection Update Result - * fw_state/sw_state = UPDATING - * send execute - */ - public void executeFwSwWare(DefaultLwM2MUplinkMsgHandler handler, LwM2mDownlinkMsgHandler request) { - this.sendLogs(handler, EXECUTE.name(), LOG_LWM2M_INFO, null); - //TODO: user this.rpcRequest??? - TbLwM2MExecuteRequest downlink = TbLwM2MExecuteRequest.builder().versionedId(pathInstallId).timeout(handler.config.getTimeout()).build(); - request.sendExecuteRequest(lwM2MClient, downlink, new TbLwM2MExecuteCallback(handler, lwM2MClient, pathInstallId)); - } - - /** - * Firmware start: Check if the version has changed and launch a new update. - * -ObjectId 5, Binary or ObjectId 5, URI - * -- If the result of the update - errors (more than 1) - This means that the previous. the update failed. - * - We launch the update regardless of the state of the firmware and its version. - * -- If the result of the update - errors (more than 1) - This means that the previous. the update failed. - * * ObjectId 5, Binary - * -- If the result of the update is not errors (equal to 1 or 0) and ver in Object 5 is not empty - it means that the previous update has passed. - * Compare current versions by equals. - * * ObjectId 5, URI - * -- If the result of the update is not errors (equal to 1 or 0) and ver in Object 5 is not empty - it means that the previous update has passed. - * Compare current versions by contains. - */ - private boolean conditionalFwUpdateStart(DefaultLwM2MUplinkMsgHandler handler) { - Long updateResultFw = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - String ver5 = (String) this.lwM2MClient.getResourceValue(null, this.pathVerId); - String pathName = (String) this.lwM2MClient.getResourceValue(null, this.pathNameId); - String ver3 = (String) this.lwM2MClient.getResourceValue(null, FW_3_VER_ID); - // #1/#2 - String fwMsg = null; - if ((this.currentVersion != null && ( - ver5 != null && ver5.equals(this.currentVersion) || - ver3 != null && ver3.contains(this.currentVersion) - )) || - (this.currentTitle != null && pathName != null && this.currentTitle.equals(pathName))) { - fwMsg = String.format("%s: The update was interrupted. The device has the same version: %s.", LOG_LWM2M_ERROR, - this.currentVersion); - } else if (updateResultFw != null && updateResultFw > UpdateResultFw.UPDATE_SUCCESSFULLY.code) { - fwMsg = String.format("%s: The update was interrupted. The device has the status UpdateResult: error (%d).", LOG_LWM2M_ERROR, - updateResultFw); - } - if (fwMsg != null) { - handler.logToTelemetry(fwMsg, lwM2MClient.getRegistration().getId()); - return false; - } else { - return true; - } - } - - - /** - * Before operation Execute inspection Update Result : - * 0 - Initial value - */ - public boolean conditionalFwExecuteStart() { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - return UpdateResultFw.INITIAL.code == updateResult; - } - - /** - * After operation Execute success inspection Update Result : - * 1 - "Firmware updated successfully" - */ - public boolean conditionalFwExecuteAfterSuccess() { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - return UpdateResultFw.UPDATE_SUCCESSFULLY.code == updateResult; - } - - /** - * After operation Execute success inspection Update Result : - * > 1 error: "Firmware updated successfully" - */ - public boolean conditionalFwExecuteAfterError() { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - return UpdateResultFw.UPDATE_SUCCESSFULLY.code < updateResult; - } - - /** - * Software start - * - If Update Result -errors (equal or more than 50) - This means that the previous. the update failed. - * * - We launch the update regardless of the state of the firmware and its version. - * - If Update Result is not errors (less than 50) and ver is not empty - This means that before. the update has passed. - * - If Update Result is not errors and ver is empty - This means that there was no update yet or before. UnInstall update - * - If Update Result is not errors and ver is not empty - This means that before unInstall update - * * - Check if the version has changed and launch a new update. - */ - private boolean conditionalSwUpdateStart(DefaultLwM2MUplinkMsgHandler handler) { - Long updateResultSw = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - // #1/#2 - return updateResultSw >= LwM2mTransportUtil.UpdateResultSw.NOT_ENOUGH_STORAGE.code || - ( - (updateResultSw <= LwM2mTransportUtil.UpdateResultSw.NOT_ENOUGH_STORAGE.code - ) && - ( - (this.currentVersion != null && !this.currentVersion.equals(this.lwM2MClient.getResourceValue(null, this.pathVerId))) || - (this.currentTitle != null && !this.currentTitle.equals(this.lwM2MClient.getResourceValue(null, this.pathNameId))) - ) - ); - } - - /** - * Before operation Execute inspection Update Result : - * 3 - Successfully Downloaded and package integrity verified - */ - public boolean conditionalSwUpdateExecute() { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - return LwM2mTransportUtil.UpdateResultSw.SUCCESSFULLY_DOWNLOADED_VERIFIED.code == updateResult; - } - - /** - * After finish operation Execute (success): - * -- inspection Update Result: - * ---- FW если Update Result == 1 ("Firmware updated successfully") или SW если Update Result == 2 ("Software successfully installed.") - * -- fw_state/sw_state = UPDATED - *

- * After finish operation Execute (error): - * -- inspection updateResult and send to thingsboard info about error - * --- send to telemetry ( key - this is name Update Result in model) ( - * -- fw_state/sw_state = FAILED - */ - public void finishFwSwUpdate(DefaultLwM2MUplinkMsgHandler handler, boolean success) { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - String value = FIRMWARE.equals(this.type) ? UpdateResultFw.fromUpdateResultFwByCode(updateResult.intValue()).type : - LwM2mTransportUtil.UpdateResultSw.fromUpdateResultSwByCode(updateResult.intValue()).type; - String key = splitCamelCaseString((String) this.lwM2MClient.getResourceNameByRezId(null, this.pathResultId)); - if (success) { - this.stateUpdate = OtaPackageUpdateStatus.UPDATED.name(); - this.sendLogs(handler, EXECUTE.name(), LOG_LWM2M_INFO, null); - } else { - this.stateUpdate = OtaPackageUpdateStatus.FAILED.name(); - this.sendLogs(handler, EXECUTE.name(), LOG_LWM2M_ERROR, value); - } - handler.helper.sendParametersOnThingsboardTelemetry( - handler.helper.getKvStringtoThingsboard(key, value), this.lwM2MClient.getSession()); - } - - /** - * After operation Execute success inspection Update Result : - * 2 - "Software successfully installed." - */ - public boolean conditionalSwExecuteAfterSuccess() { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - return LwM2mTransportUtil.UpdateResultSw.SUCCESSFULLY_INSTALLED.code == updateResult; - } - - /** - * After operation Execute success inspection Update Result : - * >= 50 - error "NOT_ENOUGH_STORAGE" - */ - public boolean conditionalSwExecuteAfterError() { - Long updateResult = (Long) this.lwM2MClient.getResourceValue(null, this.pathResultId); - return LwM2mTransportUtil.UpdateResultSw.NOT_ENOUGH_STORAGE.code <= updateResult; - } - -// private void observeStateUpdate(DefaultLwM2MUplinkMsgHandler handler, LwM2mDownlinkMsgHandler request) { -// request.sendAllRequest(lwM2MClient, -// convertPathFromObjectIdToIdVer(this.pathStateId, this.lwM2MClient.getRegistration()), OBSERVE, -// null, null, 0, null); -// request.sendAllRequest(lwM2MClient, -// convertPathFromObjectIdToIdVer(this.pathResultId, this.lwM2MClient.getRegistration()), OBSERVE, -// null, null, 0, null); -// } - - public void sendSateOnThingsBoard(DefaultLwM2MUplinkMsgHandler handler) { - if (StringUtils.trimToNull(this.stateUpdate) != null) { - List result = new ArrayList<>(); - TransportProtos.KeyValueProto.Builder kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(getAttributeKey(this.type, STATE)); - kvProto.setType(TransportProtos.KeyValueType.STRING_V).setStringV(stateUpdate); - result.add(kvProto.build()); - handler.helper.sendParametersOnThingsboardTelemetry(result, - handler.getSessionInfoOrCloseSession(this.lwM2MClient.getRegistration())); - } - } - - public void sendReadObserveInfo(LwM2mDownlinkMsgHandler request) { - this.infoFwSwUpdate = true; - this.pendingInfoRequestsStart.add(convertObjectIdToVersionedId( - this.pathStateId, this.lwM2MClient.getRegistration())); - this.pendingInfoRequestsStart.add(convertObjectIdToVersionedId( - this.pathResultId, this.lwM2MClient.getRegistration())); - this.pendingInfoRequestsStart.add(convertObjectIdToVersionedId( - FW_3_VER_ID, this.lwM2MClient.getRegistration())); - if (LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY.code == this.updateStrategy || - LwM2MFirmwareUpdateStrategy.OBJ_19_BINARY.code == this.updateStrategy || - SOFTWARE.equals(this.type)) { - this.pendingInfoRequestsStart.add(convertObjectIdToVersionedId( - this.pathVerId, this.lwM2MClient.getRegistration())); - this.pendingInfoRequestsStart.add(convertObjectIdToVersionedId( - this.pathNameId, this.lwM2MClient.getRegistration())); - } - this.pendingInfoRequestsStart.forEach(versionedId -> { - TbLwM2MObserveRequest downlink = TbLwM2MObserveRequest.builder().versionedId(versionedId).build(); - request.sendObserveRequest(this.lwM2MClient, downlink, new TbLwM2MObserveCallback(handler, lwM2MClient, versionedId)); - }); - - } - - /** - * Before operation Execute (FwUpdate) inspection Update Result : - * - after finished operation Write result: success (FwUpdate): fw_state = DOWNLOADED - * - before start operation Execute (FwUpdate) Update Result = 0 - Initial value - * - start Execute (FwUpdate) - * After finished operation Execute (FwUpdate) inspection Update Result : - * - after start operation Execute (FwUpdate): fw_state = UPDATING - * - after success finished operation Execute (FwUpdate) Update Result == 1 ("Firmware updated successfully") - * - finished operation Execute (FwUpdate) - */ - public void updateStateOta(DefaultLwM2MUplinkMsgHandler handler, LwM2mDownlinkMsgHandler request, - Registration registration, String path, int value) { - if (OBJ_5_BINARY.code == this.getUpdateStrategy()) { - if ((convertObjectIdToVersionedId(FW_RESULT_ID, registration).equals(path))) { - if (DOWNLOADED.name().equals(this.getStateUpdate()) - && this.conditionalFwExecuteStart()) { - this.executeFwSwWare(handler, request); - } else if (UPDATING.name().equals(this.getStateUpdate()) - && this.conditionalFwExecuteAfterSuccess()) { - this.finishFwSwUpdate(handler, true); - } else if (UPDATING.name().equals(this.getStateUpdate()) - && this.conditionalFwExecuteAfterError()) { - this.finishFwSwUpdate(handler, false); - } - } - } else if (OBJ_5_TEMP_URL.code == this.getUpdateStrategy()) { - if (this.currentId != null && (convertObjectIdToVersionedId(FW_STATE_ID, registration).equals(path))) { - String state = toOtaPackageUpdateStatus(UpdateStateFw.fromStateFwByCode(value)).name(); - if (StringUtils.isNotEmpty(state) && !FAILED.name().equals(this.stateUpdate) && !state.equals(this.stateUpdate)) { - this.stateUpdate = state; - this.sendSateOnThingsBoard(handler); - } - if (value == UpdateStateFw.DOWNLOADED.code) { - this.executeFwSwWare(handler, request); - } - handler.firmwareUpdateState.put(lwM2MClient.getEndpoint(), value); - } - if ((convertObjectIdToVersionedId(FW_RESULT_ID, registration).equals(path))) { - if (this.currentId != null && value == UpdateResultFw.INITIAL.code) { - this.setStateUpdate(INITIATED.name()); - } else if (this.currentId != null && value == UpdateResultFw.UPDATE_SUCCESSFULLY.code) { - this.setStateUpdate(UPDATED.name()); - } else if (value > UpdateResultFw.UPDATE_SUCCESSFULLY.code) { - this.setStateUpdate(FAILED.name()); - } - this.sendSateOnThingsBoard(handler); - } - } else if (OBJ_19_BINARY.code == this.getUpdateStrategy()) { - - } - } -} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/common/LwM2MExecutorAwareService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/common/LwM2MExecutorAwareService.java new file mode 100644 index 0000000000..1f1dff784f --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/common/LwM2MExecutorAwareService.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.common; + +import org.thingsboard.common.util.ThingsBoardExecutors; + +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; + +public abstract class LwM2MExecutorAwareService { + + protected ExecutorService executor; + + protected abstract int getExecutorSize(); + + protected abstract String getExecutorName(); + + protected void init() { + this.executor = ThingsBoardExecutors.newWorkStealingPool(getExecutorSize(), getExecutorName()); + } + + public void destroy() { + if (executor != null) { + executor.shutdownNow(); + } + } + +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/AbstractTbLwM2MRequestCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/AbstractTbLwM2MRequestCallback.java index 764d5ff5ef..561b103277 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/AbstractTbLwM2MRequestCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/AbstractTbLwM2MRequestCallback.java @@ -17,30 +17,30 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_WARN; @Slf4j public abstract class AbstractTbLwM2MRequestCallback implements DownlinkRequestCallback { - protected final LwM2mUplinkMsgHandler handler; + protected final LwM2MTelemetryLogService logService; protected final LwM2mClient client; - protected AbstractTbLwM2MRequestCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client) { - this.handler = handler; + protected AbstractTbLwM2MRequestCallback(LwM2MTelemetryLogService logService, LwM2mClient client) { + this.logService = logService; this.client = client; } @Override public void onValidationError(String params, String msg) { log.trace("[{}] Request [{}] validation failed. Reason: {}", client.getEndpoint(), params, msg); - handler.logToTelemetry(client, String.format("[%s]: Request [%s] validation failed. Reason: %s", LOG_LWM2M_WARN, params, msg)); + logService.log(client, String.format("[%s]: Request [%s] validation failed. Reason: %s", LOG_LWM2M_WARN, params, msg)); } @Override public void onError(String params, Exception e) { log.trace("[{}] Request [{}] processing failed", client.getEndpoint(), params, e); - handler.logToTelemetry(client, String.format("[%s]: Request [%s] processing failed. Reason: %s", LOG_LWM2M_WARN, params, e)); + logService.log(client, String.format("[%s]: Request [%s] processing failed. Reason: %s", LOG_LWM2M_WARN, params, e)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java index da513a1934..52ef83bf8e 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DefaultLwM2mDownlinkMsgHandler.java @@ -26,7 +26,6 @@ import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.node.ObjectLink; import org.eclipse.leshan.core.node.codec.CodecException; import org.eclipse.leshan.core.observation.Observation; -import org.eclipse.leshan.core.request.CancelObservationRequest; import org.eclipse.leshan.core.request.ContentFormat; import org.eclipse.leshan.core.request.DeleteRequest; import org.eclipse.leshan.core.request.DiscoverRequest; @@ -45,7 +44,6 @@ import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteAttributesResponse; import org.eclipse.leshan.core.response.WriteResponse; import org.eclipse.leshan.core.util.Hex; -import org.eclipse.leshan.core.util.NamedThreadFactory; import org.eclipse.leshan.server.registration.Registration; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; @@ -54,17 +52,18 @@ import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.common.LwM2MExecutorAwareService; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -74,25 +73,38 @@ import static org.eclipse.leshan.core.attributes.Attribute.LESSER_THAN; import static org.eclipse.leshan.core.attributes.Attribute.MAXIMUM_PERIOD; import static org.eclipse.leshan.core.attributes.Attribute.MINIMUM_PERIOD; import static org.eclipse.leshan.core.attributes.Attribute.STEP; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.RESPONSE_REQUEST_CHANNEL; @Slf4j @Service @TbLwM2mTransportComponent @RequiredArgsConstructor -public class DefaultLwM2mDownlinkMsgHandler implements LwM2mDownlinkMsgHandler { - private ExecutorService responseRequestExecutor; +public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService implements LwM2mDownlinkMsgHandler { public LwM2mValueConverterImpl converter; private final LwM2mTransportContext context; private final LwM2MTransportServerConfig config; + private final LwM2MTelemetryLogService logService; @PostConstruct public void init() { + super.init(); this.converter = LwM2mValueConverterImpl.getInstance(); - responseRequestExecutor = Executors.newFixedThreadPool(this.config.getResponsePoolSize(), - new NamedThreadFactory(String.format("LwM2M %s channel response after request", RESPONSE_REQUEST_CHANNEL))); + } + + @PreDestroy + public void destroy() { + super.destroy(); + } + + @Override + protected int getExecutorSize() { + return config.getDownlinkPoolSize(); + } + + @Override + protected String getExecutorName() { + return "LwM2M Downlink"; } @Override @@ -221,7 +233,8 @@ public class DefaultLwM2mDownlinkMsgHandler implements LwM2mDownlinkMsgHandler { **/ Collection resources = client.getNewResourceForInstance(request.getVersionedId(), request.getValue(), this.config.getModelProvider(), this.converter); ResourceModel resourceModelWrite = client.getResourceModel(request.getVersionedId(), this.config.getModelProvider()); - WriteRequest downlink = new WriteRequest(WriteRequest.Mode.UPDATE, convertResourceModelTypeToContentFormat(client, resourceModelWrite.type), resultIds.getObjectId(), + ContentFormat contentFormat = request.getObjectContentFormat() != null ? request.getObjectContentFormat() : convertResourceModelTypeToContentFormat(client, resourceModelWrite.type); + WriteRequest downlink = new WriteRequest(WriteRequest.Mode.UPDATE, contentFormat, resultIds.getObjectId(), resultIds.getObjectInstanceId(), resources); sendRequest(client, downlink, request.getTimeout(), callback); } else if (resultIds.isObjectInstance()) { @@ -245,19 +258,24 @@ public class DefaultLwM2mDownlinkMsgHandler implements LwM2mDownlinkMsgHandler { private , T extends LwM2mResponse> void sendRequest(LwM2mClient client, R request, long timeoutInMs, DownlinkRequestCallback callback) { Registration registration = client.getRegistration(); - context.getServer().send(registration, request, timeoutInMs, response -> { - responseRequestExecutor.submit(() -> { - try { - callback.onSuccess(request, response); - } catch (Exception e) { - log.error("[{}] failed to process successful response [{}] ", registration.getEndpoint(), response, e); - } + try { + logService.log(client, String.format("[%s][%s] Sending request: %s to %s", registration.getId(), registration.getSocketAddress(), request.getClass().getSimpleName(), request.getPath())); + context.getServer().send(registration, request, timeoutInMs, response -> { + executor.submit(() -> { + try { + callback.onSuccess(request, response); + } catch (Exception e) { + log.error("[{}] failed to process successful response [{}] ", registration.getEndpoint(), response, e); + } + }); + }, e -> { + executor.submit(() -> { + callback.onError(JacksonUtil.toString(request), e); + }); }); - }, e -> { - responseRequestExecutor.submit(() -> { - callback.onError(JacksonUtil.toString(request), e); - }); - }); + } catch (Exception e) { + callback.onError(JacksonUtil.toString(request), e); + } } private WriteRequest getWriteRequestSingleResource(ResourceModel.Type type, ContentFormat contentFormat, int objectId, int instanceId, int resourceId, Object value) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java index 42f2a1baec..f90e0f8b97 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelAllObserveCallback.java @@ -17,21 +17,21 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; -import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; @Slf4j public class TbLwM2MCancelAllObserveCallback extends AbstractTbLwM2MRequestCallback { - public TbLwM2MCancelAllObserveCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client) { - super(handler, client); + public TbLwM2MCancelAllObserveCallback(LwM2MTelemetryLogService logService, LwM2mClient client) { + super(logService, client); } @Override public void onSuccess(TbLwM2MCancelAllRequest request, Integer canceledSubscriptionsCount) { log.trace("[{}] Cancel of all observations was successful: {}", client.getEndpoint(), canceledSubscriptionsCount); - handler.logToTelemetry(client, String.format("[%s]: Cancel of all observations was successful. Result: [%s]", LOG_LWM2M_INFO, canceledSubscriptionsCount)); + logService.log(client, String.format("[%s]: Cancel of all observations was successful. Result: [%s]", LOG_LWM2M_INFO, canceledSubscriptionsCount)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java index eec33963d5..311be03462 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MCancelObserveCallback.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; @@ -27,15 +28,15 @@ public class TbLwM2MCancelObserveCallback extends AbstractTbLwM2MRequestCallback private final String versionedId; - public TbLwM2MCancelObserveCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String versionedId) { - super(handler, client); + public TbLwM2MCancelObserveCallback(LwM2MTelemetryLogService logService, LwM2mClient client, String versionedId) { + super(logService, client); this.versionedId = versionedId; } @Override public void onSuccess(TbLwM2MCancelObserveRequest request, Integer canceledSubscriptionsCount) { log.trace("[{}] Cancel observation of [{}] successful: {}", client.getEndpoint(), versionedId, canceledSubscriptionsCount); - handler.logToTelemetry(client, String.format("[%s]: Cancel Observe for [%s] successful. Result: [%s]", LOG_LWM2M_INFO, versionedId, canceledSubscriptionsCount)); + logService.log(client, String.format("[%s]: Cancel Observe for [%s] successful. Result: [%s]", LOG_LWM2M_INFO, versionedId, canceledSubscriptionsCount)); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDeleteCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDeleteCallback.java index 32c82404c6..4c0a758572 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDeleteCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDeleteCallback.java @@ -18,12 +18,13 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import org.eclipse.leshan.core.request.DeleteRequest; import org.eclipse.leshan.core.response.DeleteResponse; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; public class TbLwM2MDeleteCallback extends TbLwM2MTargetedCallback { - public TbLwM2MDeleteCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MDeleteCallback(LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(logService, client, targetId); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDiscoverCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDiscoverCallback.java index 5ae20e064d..41172f0599 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDiscoverCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MDiscoverCallback.java @@ -17,13 +17,14 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import org.eclipse.leshan.core.request.DiscoverRequest; import org.eclipse.leshan.core.response.DiscoverResponse; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; public class TbLwM2MDiscoverCallback extends TbLwM2MTargetedCallback { - public TbLwM2MDiscoverCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MDiscoverCallback(LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(logService, client, targetId); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java index d2aed6a4fd..992cf6d616 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MExecuteCallback.java @@ -17,13 +17,14 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import org.eclipse.leshan.core.request.ExecuteRequest; import org.eclipse.leshan.core.response.ExecuteResponse; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; public class TbLwM2MExecuteCallback extends TbLwM2MTargetedCallback { - public TbLwM2MExecuteCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MExecuteCallback(LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(logService, client, targetId); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java index 7fca29aa8c..dde49c870d 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MObserveCallback.java @@ -18,14 +18,15 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.request.ObserveRequest; import org.eclipse.leshan.core.response.ObserveResponse; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; @Slf4j -public class TbLwM2MObserveCallback extends TbLwM2MTargetedCallback { +public class TbLwM2MObserveCallback extends TbLwM2MUplinkTargetedCallback { - public TbLwM2MObserveCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MObserveCallback(LwM2mUplinkMsgHandler handler, LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(handler, logService, client, targetId); } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java index 657fd04fd0..59247b103b 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MReadCallback.java @@ -19,13 +19,14 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.request.ReadRequest; import org.eclipse.leshan.core.response.ReadResponse; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; @Slf4j -public class TbLwM2MReadCallback extends TbLwM2MTargetedCallback { +public class TbLwM2MReadCallback extends TbLwM2MUplinkTargetedCallback { - public TbLwM2MReadCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MReadCallback(LwM2mUplinkMsgHandler handler, LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(handler, logService, client, targetId); } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MTargetedCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MTargetedCallback.java index 258bec53ac..373c882ec4 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MTargetedCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MTargetedCallback.java @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; @@ -26,8 +27,8 @@ public abstract class TbLwM2MTargetedCallback extends AbstractTbLwM2MReque protected final String versionedId; - public TbLwM2MTargetedCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String versionedId) { - super(handler, client); + public TbLwM2MTargetedCallback(LwM2MTelemetryLogService logService, LwM2mClient client, String versionedId) { + super(logService, client); this.versionedId = versionedId; } @@ -36,7 +37,7 @@ public abstract class TbLwM2MTargetedCallback extends AbstractTbLwM2MReque //TODO convert camelCase to "camel case" using .split("(? extends TbLwM2MTargetedCallback { + + protected LwM2mUplinkMsgHandler handler; + + public TbLwM2MUplinkTargetedCallback(LwM2mUplinkMsgHandler handler, LwM2MTelemetryLogService logService, LwM2mClient client, String versionedId) { + super(logService, client, versionedId); + this.handler = handler; + } + +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteAttributesCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteAttributesCallback.java index 372534fa7a..e346ad965a 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteAttributesCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteAttributesCallback.java @@ -18,12 +18,13 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import org.eclipse.leshan.core.request.WriteAttributesRequest; import org.eclipse.leshan.core.response.WriteAttributesResponse; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; public class TbLwM2MWriteAttributesCallback extends TbLwM2MTargetedCallback { - public TbLwM2MWriteAttributesCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MWriteAttributesCallback(LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(logService, client, targetId); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java index 89586bd113..4746077f19 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/TbLwM2MWriteResponseCallback.java @@ -18,12 +18,13 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; import org.eclipse.leshan.core.request.WriteRequest; import org.eclipse.leshan.core.response.WriteResponse; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; -public class TbLwM2MWriteResponseCallback extends TbLwM2MTargetedCallback { +public class TbLwM2MWriteResponseCallback extends TbLwM2MUplinkTargetedCallback { - public TbLwM2MWriteResponseCallback(LwM2mUplinkMsgHandler handler, LwM2mClient client, String targetId) { - super(handler, client, targetId); + public TbLwM2MWriteResponseCallback(LwM2mUplinkMsgHandler handler, LwM2MTelemetryLogService logService, LwM2mClient client, String targetId) { + super(handler, logService, client, targetId); } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java new file mode 100644 index 0000000000..421f4cf3a6 --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.log; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; +import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; + +import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_TELEMETRY; + +@Slf4j +@Service +@TbLwM2mTransportComponent +@RequiredArgsConstructor +public class DefaultLwM2MTelemetryLogService implements LwM2MTelemetryLogService { + + private final LwM2mClientContext clientContext; + private final LwM2mTransportServerHelper helper; + + /** + * @param logMsg - text msg + * @param registrationId - Id of Registration LwM2M Client + */ + @Override + public void log(String registrationId, String logMsg) { + log(clientContext.getClientByRegistrationId(registrationId), logMsg); + } + + @Override + public void log(LwM2mClient client, String logMsg) { + if (logMsg != null && client != null && client.getSession() != null) { + if (logMsg.length() > 1024) { + logMsg = logMsg.substring(0, 1024); + } + this.helper.sendParametersOnThingsboardTelemetry(this.helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), client.getSession()); + } + } + +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/LwM2MTelemetryLogService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/LwM2MTelemetryLogService.java new file mode 100644 index 0000000000..ff8543303e --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/LwM2MTelemetryLogService.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.log; + +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; + +public interface LwM2MTelemetryLogService { + + void log(LwM2mClient client, String msg); + + void log(String registrationId, String msg); + +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java index 3f86131320..d44770e591 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java @@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.DonAsynchron; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cache.ota.OtaPackageDataCache; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.ota.OtaPackageKey; @@ -41,33 +40,35 @@ import org.thingsboard.server.transport.lwm2m.server.UpdateStateFw; import org.thingsboard.server.transport.lwm2m.server.attributes.LwM2MAttributesService; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; +import org.thingsboard.server.transport.lwm2m.server.common.LwM2MExecutorAwareService; import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MExecuteCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MExecuteRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteReplaceRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteResponseCallback; -import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteUpdateRequest; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import static org.thingsboard.server.common.data.ota.OtaPackageKey.STATE; import static org.thingsboard.server.common.data.ota.OtaPackageUtil.getAttributeKey; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FIRMWARE_UPDATE_COAP_RECOURSE; +import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_TELEMETRY; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertObjectIdToVersionedId; @Slf4j @Service @TbLwM2mTransportComponent @RequiredArgsConstructor -public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { +public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService implements LwM2MOtaUpdateService { public static final String FIRMWARE_VERSION = getAttributeKey(OtaPackageType.FIRMWARE, OtaPackageKey.VERSION); public static final String FIRMWARE_TITLE = getAttributeKey(OtaPackageType.FIRMWARE, OtaPackageKey.TITLE); @@ -93,8 +94,8 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { private final LwM2mUplinkMsgHandler uplinkHandler; private final LwM2mDownlinkMsgHandler downlinkHandler; private final OtaPackageDataCache otaPackageDataCache; + private final LwM2MTelemetryLogService logService; private final LwM2mTransportServerHelper helper; - private ExecutorService executor; @Autowired @Lazy @@ -102,8 +103,22 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { @PostConstruct public void init() { - //TODO: define parallelism in constant - executor = ThingsBoardExecutors.newWorkStealingPool(4, "LwM2M OTA Updates"); + super.init(); + } + + @PreDestroy + public void destroy() { + super.destroy(); + } + + @Override + protected int getExecutorSize() { + return config.getOtaPoolSize(); + } + + @Override + protected String getExecutorName() { + return "LwM2M OTA"; } @Override @@ -112,7 +127,6 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { //TODO: check that the client supports FW and SW by checking the supported objects in the model. List attributesToFetch = new ArrayList<>(); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); - fwInfo.setSupported(client.isValidObjectVersion(FW_NAME_ID) || client.isValidObjectVersion(FW_VER_ID)); if (fwInfo.isSupported()) { attributesToFetch.add(FIRMWARE_TITLE); attributesToFetch.add(FIRMWARE_VERSION); @@ -138,6 +152,14 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { } } + @Override + public void forceFirmwareUpdate(LwM2mClient client) { + LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); + fwInfo.setRetryAttempts(0); + fwInfo.setFailedPackageId(null); + startFirmwareUpdateIfNeeded(client, fwInfo); + } + @Override public void onTargetFirmwareUpdate(LwM2mClient client, String newFirmwareTitle, String newFirmwareVersion, Optional newFirmwareUrl) { LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); @@ -147,33 +169,48 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { @Override public void onCurrentFirmwareNameUpdate(LwM2mClient client, String name) { + log.debug("[{}] Current fw name: {}", client.getEndpoint(), name); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); fwInfo.setCurrentName(name); } @Override - public void onCurrentFirmwareVersionUpdate(LwM2mClient client, String version) { + public void onCurrentFirmwareVersion3Update(LwM2mClient client, String version) { + log.debug("[{}] Current fw version: {}", client.getEndpoint(), version); + LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); + fwInfo.setCurrentVersion3(version); + } + + @Override + public void onCurrentFirmwareVersion5Update(LwM2mClient client, String version) { + log.debug("[{}] Current fw version: {}", client.getEndpoint(), version); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); - fwInfo.setCurrentVersion(version); + fwInfo.setCurrentVersion5(version); } @Override - public void onCurrentFirmwareStateUpdate(LwM2mClient client, Long state) { + public void onCurrentFirmwareStateUpdate(LwM2mClient client, Long stateCode) { + log.debug("[{}] Current fw state: {}", client.getEndpoint(), stateCode); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); - UpdateStateFw newState = UpdateStateFw.fromStateFwByCode(state.intValue()); - if (UpdateStateFw.DOWNLOADED.equals(newState)) { + UpdateStateFw state = UpdateStateFw.fromStateFwByCode(stateCode.intValue()); + if (UpdateStateFw.DOWNLOADED.equals(state)) { executeFwUpdate(client); } - fwInfo.setUpdateState(newState); - sendStateUpdateToTelemetry(client, fwInfo, LwM2mTransportUtil.toOtaPackageUpdateStatus(newState)); + fwInfo.setUpdateState(state); + Optional status = LwM2mTransportUtil.toOtaPackageUpdateStatus(state); + status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo, + otaStatus, "Firmware Update State: " + state.name())); } @Override public void onCurrentFirmwareResultUpdate(LwM2mClient client, Long code) { + log.debug("[{}] Current fw result: {}", client.getEndpoint(), code); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); UpdateResultFw result = UpdateResultFw.fromUpdateResultFwByCode(code.intValue()); - sendStateUpdateToTelemetry(client, fwInfo, LwM2mTransportUtil.toOtaPackageUpdateStatus(result)); - if (result.isAgain && fwInfo.getRetryAttempts() <= 2) { + Optional status = LwM2mTransportUtil.toOtaPackageUpdateStatus(result); + status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo, + otaStatus, "Firmware Update Result: " + result.name())); + if (result.isAgain() && fwInfo.getRetryAttempts() <= 2) { fwInfo.setRetryAttempts(fwInfo.getRetryAttempts() + 1); startFirmwareUpdateIfNeeded(client, fwInfo); } else { @@ -183,6 +220,7 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { @Override public void onCurrentFirmwareDeliveryMethodUpdate(LwM2mClient client, Long value) { + log.debug("[{}] Current fw delivery method: {}", client.getEndpoint(), value); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); fwInfo.setDeliveryMethod(value.intValue()); } @@ -193,22 +231,29 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { } private void startFirmwareUpdateIfNeeded(LwM2mClient client, LwM2MClientOtaInfo fwInfo) { - if (fwInfo.isUpdateRequired()) { - if (StringUtils.isNotEmpty(fwInfo.getTargetUrl())) { - log.info("[{}] Starting update to [{}{}] using URL: {}", client.getEndpoint(), fwInfo.getTargetName(), fwInfo.getTargetVersion(), fwInfo.getTargetUrl()); - startFirmwareUpdateUsingUrl(client, fwInfo.getTargetUrl()); - } else { - startFirmwareUpdateUsingBinary(client, fwInfo); + try { + if (!fwInfo.isSupported()) { + log.debug("[{}] Fw update is not supported: {}", client.getEndpoint(), fwInfo); + sendStateUpdateToTelemetry(client, fwInfo, OtaPackageUpdateStatus.FAILED, "Client does not support firmware update or profile misconfiguration!"); + } else if (fwInfo.isUpdateRequired()) { + if (StringUtils.isNotEmpty(fwInfo.getTargetUrl())) { + log.debug("[{}] Starting update to [{}{}] using URL: {}", client.getEndpoint(), fwInfo.getTargetName(), fwInfo.getTargetVersion(), fwInfo.getTargetUrl()); + startFirmwareUpdateUsingUrl(client, fwInfo.getTargetUrl()); + } else { + log.debug("[{}] Starting update to [{}{}] using binary", client.getEndpoint(), fwInfo.getTargetName(), fwInfo.getTargetVersion()); + startFirmwareUpdateUsingBinary(client, fwInfo); + } } - } else if (fwInfo.isUpdateFailed()) { - sendStateUpdateToTelemetry(client, fwInfo, OtaPackageUpdateStatus.FAILED); + } catch (Exception e) { + log.warn("[{}] failed to update client: {}", client.getEndpoint(), fwInfo, e); + sendStateUpdateToTelemetry(client, fwInfo, OtaPackageUpdateStatus.FAILED, "Internal server error: " + e.getMessage()); } } private void startFirmwareUpdateUsingUrl(LwM2mClient client, String url) { String targetIdVer = convertObjectIdToVersionedId(FW_URL_ID, client.getRegistration()); - TbLwM2MWriteUpdateRequest request = TbLwM2MWriteUpdateRequest.builder().versionedId(targetIdVer).value(url).timeout(config.getTimeout()).build(); - downlinkHandler.sendWriteUpdateRequest(client, request, new TbLwM2MWriteResponseCallback(uplinkHandler, client, targetIdVer)); + TbLwM2MWriteReplaceRequest request = TbLwM2MWriteReplaceRequest.builder().versionedId(targetIdVer).value(url).timeout(config.getTimeout()).build(); + downlinkHandler.sendWriteReplaceRequest(client, request, new TbLwM2MWriteResponseCallback(uplinkHandler, logService, client, targetIdVer)); } public void startFirmwareUpdateUsingBinary(LwM2mClient client, LwM2MClientOtaInfo fwInfo) { @@ -217,41 +262,44 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { new TransportServiceCallback<>() { @Override public void onSuccess(TransportProtos.GetOtaPackageResponseMsg response) { - if (TransportProtos.ResponseStatus.SUCCESS.equals(response.getResponseStatus()) - && response.getType().equals(OtaPackageType.FIRMWARE.name())) { - UUID otaPackageId = new UUID(response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB()); - LwM2MFirmwareUpdateStrategy strategy; - if (fwInfo.getDeliveryMethod() == null || fwInfo.getDeliveryMethod() == 2) { - strategy = fwInfo.getStrategy(); - } else { - strategy = fwInfo.getDeliveryMethod() == 0 ? LwM2MFirmwareUpdateStrategy.OBJ_5_TEMP_URL : LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY; - } - switch (strategy) { - case OBJ_5_BINARY: - byte[] firmwareChunk = otaPackageDataCache.get(otaPackageId.toString(), 0, 0); - TbLwM2MWriteReplaceRequest writeRequest = TbLwM2MWriteReplaceRequest.builder().versionedId(versionedId) - .value(firmwareChunk).contentFormat(ContentFormat.OPAQUE) - .timeout(config.getTimeout()).build(); - downlinkHandler.sendWriteReplaceRequest(client, writeRequest, new TbLwM2MWriteResponseCallback(uplinkHandler, client, versionedId)); - break; - case OBJ_5_TEMP_URL: - startFirmwareUpdateUsingUrl(client, fwInfo.getBaseUrl() + "/" + FIRMWARE_UPDATE_COAP_RECOURSE + "/" + otaPackageId.toString()); - break; - default: - //TODO: send log to telemetry - } - } else { - //TODO: send log to telemetry - } + executor.submit(() -> doUpdateFirmwareUsingBinary(response, fwInfo, versionedId, client)); } @Override public void onError(Throwable e) { - log.trace("Failed to process firmwareUpdate ", e); + logService.log(client, "Failed to process firmware update: " + e.getMessage()); } }); } + private void doUpdateFirmwareUsingBinary(TransportProtos.GetOtaPackageResponseMsg response, LwM2MClientOtaInfo fwInfo, String versionedId, LwM2mClient client) { + if (TransportProtos.ResponseStatus.SUCCESS.equals(response.getResponseStatus())) { + UUID otaPackageId = new UUID(response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB()); + LwM2MFirmwareUpdateStrategy strategy; + if (fwInfo.getDeliveryMethod() == null || fwInfo.getDeliveryMethod() == 2) { + strategy = fwInfo.getStrategy(); + } else { + strategy = fwInfo.getDeliveryMethod() == 0 ? LwM2MFirmwareUpdateStrategy.OBJ_5_TEMP_URL : LwM2MFirmwareUpdateStrategy.OBJ_5_BINARY; + } + switch (strategy) { + case OBJ_5_BINARY: + byte[] firmwareChunk = otaPackageDataCache.get(otaPackageId.toString(), 0, 0); + TbLwM2MWriteReplaceRequest writeRequest = TbLwM2MWriteReplaceRequest.builder().versionedId(versionedId) + .value(firmwareChunk).contentFormat(ContentFormat.OPAQUE) + .timeout(config.getTimeout()).build(); + downlinkHandler.sendWriteReplaceRequest(client, writeRequest, new TbLwM2MWriteResponseCallback(uplinkHandler, logService, client, versionedId)); + break; + case OBJ_5_TEMP_URL: + startFirmwareUpdateUsingUrl(client, fwInfo.getBaseUrl() + "/" + FIRMWARE_UPDATE_COAP_RECOURSE + "/" + otaPackageId.toString()); + break; + default: + sendStateUpdateToTelemetry(client, fwInfo, OtaPackageUpdateStatus.FAILED, "Unsupported strategy: " + strategy.name()); + } + } else { + sendStateUpdateToTelemetry(client, fwInfo, OtaPackageUpdateStatus.FAILED, "Failed to fetch OTA package: " + response.getResponseStatus()); + } + } + private TransportProtos.GetOtaPackageRequestMsg createOtaPackageRequestMsg(TransportProtos.SessionInfoProto sessionInfo, String nameFwSW) { return TransportProtos.GetOtaPackageRequestMsg.newBuilder() .setDeviceIdMSB(sessionInfo.getDeviceIdMSB()) @@ -264,7 +312,7 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { private void executeFwUpdate(LwM2mClient client) { TbLwM2MExecuteRequest request = TbLwM2MExecuteRequest.builder().versionedId(FW_EXECUTE_ID).timeout(config.getTimeout()).build(); - downlinkHandler.sendExecuteRequest(client, request, new TbLwM2MExecuteCallback(uplinkHandler, client, FW_EXECUTE_ID)); + downlinkHandler.sendExecuteRequest(client, request, new TbLwM2MExecuteCallback(logService, client, FW_EXECUTE_ID)); } private Optional getAttributeValue(List attrs, String keyName) { @@ -298,11 +346,14 @@ public class DefaultLwM2MOtaUpdateService implements LwM2MOtaUpdateService { } - private void sendStateUpdateToTelemetry(LwM2mClient client, LwM2MClientOtaInfo fwInfo, OtaPackageUpdateStatus status) { + private void sendStateUpdateToTelemetry(LwM2mClient client, LwM2MClientOtaInfo fwInfo, OtaPackageUpdateStatus status, String log) { List result = new ArrayList<>(); TransportProtos.KeyValueProto.Builder kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(getAttributeKey(fwInfo.getType(), STATE)); kvProto.setType(TransportProtos.KeyValueType.STRING_V).setStringV(status.name()); result.add(kvProto.build()); + kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(LOG_LWM2M_TELEMETRY); + kvProto.setType(TransportProtos.KeyValueType.STRING_V).setStringV(log); + result.add(kvProto.build()); helper.sendParametersOnThingsboardTelemetry(result, client.getSession()); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MClientOtaInfo.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MClientOtaInfo.java index 50b44f8d06..43d4f4acf5 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MClientOtaInfo.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MClientOtaInfo.java @@ -31,7 +31,6 @@ public class LwM2MClientOtaInfo { private final OtaPackageType type; private String baseUrl; - private boolean supported; private boolean targetFetchFailure; private String targetName; @@ -40,7 +39,8 @@ public class LwM2MClientOtaInfo { private boolean currentFetchFailure; private String currentName; - private String currentVersion; + private String currentVersion3; + private String currentVersion5; private Integer deliveryMethod; //TODO: use value from device if applicable; @@ -65,24 +65,27 @@ public class LwM2MClientOtaInfo { } public boolean isUpdateRequired() { - if (StringUtils.isEmpty(targetName) || - StringUtils.isEmpty(targetVersion) || - (StringUtils.isEmpty(currentName) && StringUtils.isEmpty(currentVersion))) { + if (StringUtils.isEmpty(targetName) || StringUtils.isEmpty(targetVersion) || !isSupported()) { return false; } else { String targetPackageId = getPackageId(targetName, targetVersion); - String currentPackageId = getPackageId(currentName, currentVersion); + String currentPackageIdUsingObject5 = getPackageId(currentName, currentVersion5); if (StringUtils.isNotEmpty(failedPackageId) && failedPackageId.equals(targetPackageId)) { return false; } else { - return !targetPackageId.equals(currentPackageId); + if (targetPackageId.equals(currentPackageIdUsingObject5)) { + return false; + } else if (StringUtils.isNotEmpty(currentVersion3)) { + return !currentVersion3.contains(targetPackageId); + } else { + return true; + } } } } - public boolean isUpdateFailed() { - return (StringUtils.isNotEmpty(targetName) && StringUtils.isNotEmpty(targetVersion) && - (StringUtils.isEmpty(currentName) && StringUtils.isEmpty(currentVersion))); + public boolean isSupported() { + return StringUtils.isNotEmpty(currentName) || StringUtils.isNotEmpty(currentVersion5) || StringUtils.isNotEmpty(currentVersion3); } public void setUpdateResult(UpdateResultFw updateResult) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MOtaUpdateService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MOtaUpdateService.java index 6386e94364..9c7905a5e8 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MOtaUpdateService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/LwM2MOtaUpdateService.java @@ -23,13 +23,17 @@ public interface LwM2MOtaUpdateService { void init(LwM2mClient client); + void forceFirmwareUpdate(LwM2mClient client); + void onTargetFirmwareUpdate(LwM2mClient client, String newFirmwareTitle, String newFirmwareVersion, Optional newFirmwareUrl); void onTargetSoftwareUpdate(LwM2mClient client, String newSoftwareTitle, String newSoftwareVersion); void onCurrentFirmwareNameUpdate(LwM2mClient client, String name); - void onCurrentFirmwareVersionUpdate(LwM2mClient client, String version); + void onCurrentFirmwareVersion3Update(LwM2mClient client, String version); + + void onCurrentFirmwareVersion5Update(LwM2mClient client, String version); void onCurrentFirmwareStateUpdate(LwM2mClient client, Long state); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java index 533458ce76..5de223fa2f 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java @@ -50,6 +50,7 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttrib import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteReplaceRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteResponseCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteUpdateRequest; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import java.util.Map; @@ -69,6 +70,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { private final LwM2MTransportServerConfig config; private final LwM2mUplinkMsgHandler uplinkHandler; private final LwM2mDownlinkMsgHandler downlinkHandler; + private final LwM2MTelemetryLogService logService; private final Map rpcSubscriptions = new ConcurrentHashMap<>(); @Override @@ -148,14 +150,14 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { private void sendReadRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg, String versionedId) { TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MReadCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MReadCallback(uplinkHandler, logService, client, versionedId); var rpcCallback = new RpcReadResponseCallback<>(transportService, client, requestMsg, versionedId, mainCallback); downlinkHandler.sendReadRequest(client, request, rpcCallback); } private void sendObserveRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg, String versionedId) { TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MObserveCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MObserveCallback(uplinkHandler, logService, client, versionedId); var rpcCallback = new RpcReadResponseCallback<>(transportService, client, requestMsg, versionedId, mainCallback); downlinkHandler.sendObserveRequest(client, request, rpcCallback); } @@ -172,14 +174,14 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { private void sendDiscoverRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg, String versionedId) { TbLwM2MDiscoverRequest request = TbLwM2MDiscoverRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MDiscoverCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MDiscoverCallback(logService, client, versionedId); var rpcCallback = new RpcDiscoverCallback(transportService, client, requestMsg, mainCallback); downlinkHandler.sendDiscoverRequest(client, request, rpcCallback); } private void sendExecuteRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg, String versionedId) { TbLwM2MExecuteRequest downlink = TbLwM2MExecuteRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MExecuteCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MExecuteCallback(logService, client, versionedId); var rpcCallback = new RpcEmptyResponseCallback<>(transportService, client, requestMsg, mainCallback); downlinkHandler.sendExecuteRequest(client, downlink, rpcCallback); } @@ -189,7 +191,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(versionedId) .attributes(requestBody.getAttributes()) .timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MWriteAttributesCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MWriteAttributesCallback(logService, client, versionedId); var rpcCallback = new RpcEmptyResponseCallback<>(transportService, client, requestMsg, mainCallback); downlinkHandler.sendWriteAttributesRequest(client, request, rpcCallback); } @@ -198,7 +200,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { RpcWriteUpdateRequest requestBody = JacksonUtil.fromString(requestMsg.getParams(), RpcWriteUpdateRequest.class); TbLwM2MWriteUpdateRequest.TbLwM2MWriteUpdateRequestBuilder builder = TbLwM2MWriteUpdateRequest.builder().versionedId(versionedId); builder.value(requestBody.getValue()).timeout(this.config.getTimeout()); - var mainCallback = new TbLwM2MWriteResponseCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MWriteResponseCallback(uplinkHandler, logService, client, versionedId); var rpcCallback = new RpcEmptyResponseCallback<>(transportService, client, requestMsg, mainCallback); downlinkHandler.sendWriteUpdateRequest(client, builder.build(), rpcCallback); } @@ -208,28 +210,28 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { TbLwM2MWriteReplaceRequest request = TbLwM2MWriteReplaceRequest.builder().versionedId(versionedId) .value(requestBody.getValue()) .timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MWriteResponseCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MWriteResponseCallback(uplinkHandler, logService, client, versionedId); var rpcCallback = new RpcEmptyResponseCallback<>(transportService, client, requestMsg, mainCallback); downlinkHandler.sendWriteReplaceRequest(client, request, rpcCallback); } private void sendCancelObserveRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg, String versionedId) { TbLwM2MCancelObserveRequest downlink = TbLwM2MCancelObserveRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MCancelObserveCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MCancelObserveCallback(logService, client, versionedId); var rpcCallback = new RpcCancelObserveCallback(transportService, client, requestMsg, mainCallback); downlinkHandler.sendCancelObserveRequest(client, downlink, rpcCallback); } private void sendDeleteRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg, String versionedId) { TbLwM2MDeleteRequest downlink = TbLwM2MDeleteRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MDeleteCallback(uplinkHandler, client, versionedId); + var mainCallback = new TbLwM2MDeleteCallback(logService, client, versionedId); var rpcCallback = new RpcEmptyResponseCallback<>(transportService, client, requestMsg, mainCallback); downlinkHandler.sendDeleteRequest(client, downlink, rpcCallback); } private void sendCancelAllObserveRequest(LwM2mClient client, TransportProtos.ToDeviceRpcRequestMsg requestMsg) { TbLwM2MCancelAllRequest downlink = TbLwM2MCancelAllRequest.builder().timeout(this.config.getTimeout()).build(); - var mainCallback = new TbLwM2MCancelAllObserveCallback(uplinkHandler, client); + var mainCallback = new TbLwM2MCancelAllObserveCallback(logService, client); var rpcCallback = new RpcCancelAllObserveCallback(transportService, client, requestMsg, mainCallback); downlinkHandler.sendCancelAllRequest(client, downlink, rpcCallback); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 61c4b077d9..5b61ffc768 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -44,8 +44,6 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes; import org.thingsboard.server.common.data.device.data.lwm2m.TelemetryMappingConfiguration; import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.id.OtaPackageId; -import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.ota.OtaPackageUtil; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; @@ -66,9 +64,9 @@ import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState; import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientStateException; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; -import org.thingsboard.server.transport.lwm2m.server.client.LwM2mFwSwUpdate; import org.thingsboard.server.transport.lwm2m.server.client.ParametersAnalyzeResult; import org.thingsboard.server.transport.lwm2m.server.client.ResultsAddKeyValueProto; +import org.thingsboard.server.transport.lwm2m.server.common.LwM2MExecutorAwareService; import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback; @@ -82,12 +80,14 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadCallbac import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore; import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -105,9 +105,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH; -import static org.thingsboard.server.common.data.ota.OtaPackageUpdateStatus.FAILED; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_3_VER_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_5_ID; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_5_VER_ID; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_DELIVERY_METHOD; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_NAME_ID; @@ -115,22 +113,17 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.F import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.FW_STATE_ID; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_ERROR; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_TELEMETRY; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_WARN; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.SW_ID; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertOtaUpdateValueToString; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertObjectIdToVersionedId; +import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.convertOtaUpdateValueToString; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.fromVersionedIdToObjectId; @Slf4j @Service @TbLwM2mTransportComponent -public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { +public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService implements LwM2mUplinkMsgHandler { - private ExecutorService registrationExecutor; - private ExecutorService updateRegistrationExecutor; - private ExecutorService unRegistrationExecutor; public LwM2mValueConverterImpl converter; private final TransportService transportService; @@ -138,6 +131,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { private final LwM2MAttributesService attributesService; private final LwM2MOtaUpdateService otaService; public final LwM2MTransportServerConfig config; + private final LwM2MTelemetryLogService logService; public final OtaPackageDataCache otaPackageDataCache; public final LwM2mTransportServerHelper helper; private final TbLwM2MDtlsSessionStore sessionStore; @@ -148,8 +142,10 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { public final Map firmwareUpdateState; public DefaultLwM2MUplinkMsgHandler(TransportService transportService, - LwM2MTransportServerConfig config, LwM2mTransportServerHelper helper, + LwM2MTransportServerConfig config, + LwM2mTransportServerHelper helper, LwM2mClientContext clientContext, + LwM2MTelemetryLogService logService, @Lazy LwM2MOtaUpdateService otaService, @Lazy LwM2MAttributesService attributesService, @Lazy LwM2MRpcRequestHandler rpcHandler, @@ -162,6 +158,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { this.config = config; this.helper = helper; this.clientContext = clientContext; + this.logService = logService; this.rpcHandler = rpcHandler; this.defaultLwM2MDownlinkMsgHandler = defaultLwM2MDownlinkMsgHandler; this.otaPackageDataCache = otaPackageDataCache; @@ -171,14 +168,27 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { } @PostConstruct - public void initAttributes() { + public void init() { + super.init(); this.context.getScheduler().scheduleAtFixedRate(this::reportActivity, new Random().nextInt((int) config.getSessionReportTimeout()), config.getSessionReportTimeout(), TimeUnit.MILLISECONDS); - this.registrationExecutor = ThingsBoardExecutors.newWorkStealingPool(this.config.getRegisteredPoolSize(), "LwM2M registration"); - this.updateRegistrationExecutor = ThingsBoardExecutors.newWorkStealingPool(this.config.getUpdateRegisteredPoolSize(), "LwM2M update registration"); - this.unRegistrationExecutor = ThingsBoardExecutors.newWorkStealingPool(this.config.getUnRegisteredPoolSize(), "LwM2M unRegistration"); this.converter = LwM2mValueConverterImpl.getInstance(); } + @PreDestroy + public void destroy() { + super.destroy(); + } + + @Override + protected String getExecutorName() { + return "LwM2M uplink"; + } + + @Override + protected int getExecutorSize() { + return config.getUplinkPoolSize(); + } + /** * Start registration device * Create session: Map, LwM2MClient> @@ -194,7 +204,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { * @param previousObservations - may be null */ public void onRegistered(Registration registration, Collection previousObservations) { - registrationExecutor.submit(() -> { + executor.submit(() -> { LwM2mClient lwM2MClient = this.clientContext.getClientByEndpoint(registration.getEndpoint()); try { log.warn("[{}] [{{}] Client: create after Registration", registration.getEndpoint(), registration.getId()); @@ -204,7 +214,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB())); closeSession(oldSessionInfo.get()); } - this.logToTelemetry(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); + logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); SessionInfoProto sessionInfo = lwM2MClient.getSession(); transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo)); log.warn("40) sessionId [{}] Registering rpc subscription after Registration client", new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB())); @@ -215,8 +225,6 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build()) .build(); transportService.process(msg, null); - this.getInfoFirmwareUpdate(lwM2MClient); - this.getInfoSoftwareUpdate(lwM2MClient); this.initClientTelemetry(lwM2MClient); this.initAttributes(lwM2MClient); otaService.init(lwM2MClient); @@ -229,11 +237,11 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { // Race condition detected and the client was in progress of unregistration while new registration arrived. Let's try again. onRegistered(registration, previousObservations); } else { - this.logToTelemetry(lwM2MClient, LOG_LWM2M_WARN + ": Client registration failed due to invalid state: " + stateException.getState()); + logService.log(lwM2MClient, LOG_LWM2M_WARN + ": Client registration failed due to invalid state: " + stateException.getState()); } } catch (Throwable t) { log.error("[{}] endpoint [{}] error Unable registration.", registration.getEndpoint(), t); - this.logToTelemetry(lwM2MClient, LOG_LWM2M_WARN + ": Client registration failed due to: " + t.getMessage()); + logService.log(lwM2MClient, LOG_LWM2M_WARN + ": Client registration failed due to: " + t.getMessage()); } }); } @@ -244,7 +252,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { * @param registration - Registration LwM2M Client */ public void updatedReg(Registration registration) { - updateRegistrationExecutor.submit(() -> { + executor.submit(() -> { LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint()); try { log.warn("[{}] [{{}] Client: update after Registration", registration.getEndpoint(), registration.getId()); @@ -265,7 +273,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { } } catch (Throwable t) { log.error("[{}] endpoint [{}] error Unable update registration.", registration.getEndpoint(), t); - this.logToTelemetry(lwM2MClient, LOG_LWM2M_ERROR + String.format(": Client update Registration, %s", t.getMessage())); + logService.log(lwM2MClient, LOG_LWM2M_ERROR + String.format(": Client update Registration, %s", t.getMessage())); } }); } @@ -275,10 +283,10 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { * @param observations - !!! Warn: if have not finishing unReg, then this operation will be finished on next Client`s connect */ public void unReg(Registration registration, Collection observations) { - unRegistrationExecutor.submit(() -> { + executor.submit(() -> { LwM2mClient client = clientContext.getClientByEndpoint(registration.getEndpoint()); try { - this.logToTelemetry(client, LOG_LWM2M_INFO + ": Client unRegistration"); + logService.log(client, LOG_LWM2M_INFO + ": Client unRegistration"); clientContext.unregister(client, registration); SessionInfoProto sessionInfo = client.getSession(); if (sessionInfo != null) { @@ -292,7 +300,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { log.info("[{}] delete registration: [{}] {}.", registration.getEndpoint(), stateException.getState(), stateException.getMessage()); } catch (Throwable t) { log.error("[{}] endpoint [{}] error Unable un registration.", registration.getEndpoint(), t); - this.logToTelemetry(client, LOG_LWM2M_ERROR + String.format(": Client Unable un Registration, %s", t.getMessage())); + logService.log(client, LOG_LWM2M_ERROR + String.format(": Client Unable un Registration, %s", t.getMessage())); } }); } @@ -305,7 +313,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { @Override public void onSleepingDev(Registration registration) { log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint()); - this.logToTelemetry(clientContext.getClientByEndpoint(registration.getEndpoint()), LOG_LWM2M_INFO + ": Client is sleeping!"); + logService.log(clientContext.getClientByEndpoint(registration.getEndpoint()), LOG_LWM2M_INFO + ": Client is sleeping!"); //TODO: associate endpointId with device information. } @@ -403,29 +411,10 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { @Override public void onAwakeDev(Registration registration) { log.trace("[{}] [{}] Received endpoint Awake version event", registration.getId(), registration.getEndpoint()); - this.logToTelemetry(clientContext.getClientByEndpoint(registration.getEndpoint()), LOG_LWM2M_INFO + ": Client is awake!"); + logService.log(clientContext.getClientByEndpoint(registration.getEndpoint()), LOG_LWM2M_INFO + ": Client is awake!"); //TODO: associate endpointId with device information. } - /** - * @param logMsg - text msg - * @param registrationId - Id of Registration LwM2M Client - */ - @Override - public void logToTelemetry(String registrationId, String logMsg) { - logToTelemetry(clientContext.getClientByRegistrationId(registrationId), logMsg); - } - - @Override - public void logToTelemetry(LwM2mClient client, String logMsg) { - if (logMsg != null && client != null && client.getSession() != null) { - if (logMsg.length() > 1024) { - logMsg = logMsg.substring(0, 1024); - } - this.helper.sendParametersOnThingsboardTelemetry(this.helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), client.getSession()); - } - } - /** * #1 clientOnlyObserveAfterConnect == true * - Only Observe Request to the client marked as observe from the profile configuration. @@ -441,10 +430,6 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { Lwm2mDeviceProfileTransportConfiguration profile = clientContext.getProfile(lwM2MClient.getProfileId()); Set supportedObjects = clientContext.getSupportedIdVerInClient(lwM2MClient); if (supportedObjects != null && supportedObjects.size() > 0) { - if (LwM2mTransportUtil.LwM2MClientStrategy.CLIENT_STRATEGY_2.code == profile.getClientLwM2mSettings().getClientOnlyObserveAfterConnect()) { - // #2 - supportedObjects.forEach(versionedId -> sendReadRequest(lwM2MClient, versionedId)); - } // #1 this.sendReadRequests(lwM2MClient, profile, supportedObjects); this.sendObserveRequests(lwM2MClient, profile, supportedObjects); @@ -461,7 +446,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { CountDownLatch latch = new CountDownLatch(targetIds.size()); targetIds.forEach(versionedId -> sendReadRequest(lwM2MClient, versionedId, - new TbLwM2MLatchCallback<>(latch, new TbLwM2MReadCallback(this, lwM2MClient, versionedId)))); + new TbLwM2MLatchCallback<>(latch, new TbLwM2MReadCallback(this, logService, lwM2MClient, versionedId)))); try { latch.await(); } catch (InterruptedException e) { @@ -475,7 +460,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { CountDownLatch latch = new CountDownLatch(targetIds.size()); targetIds.forEach(targetId -> sendObserveRequest(lwM2MClient, targetId, - new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, lwM2MClient, targetId)))); + new TbLwM2MLatchCallback<>(latch, new TbLwM2MObserveCallback(this, logService, lwM2MClient, targetId)))); try { latch.await(); } catch (InterruptedException e) { @@ -501,11 +486,11 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { private void sendDiscoverRequest(LwM2mClient lwM2MClient, String targetId) { TbLwM2MDiscoverRequest request = TbLwM2MDiscoverRequest.builder().versionedId(targetId).timeout(this.config.getTimeout()).build(); - defaultLwM2MDownlinkMsgHandler.sendDiscoverRequest(lwM2MClient, request, new TbLwM2MDiscoverCallback(this, lwM2MClient, targetId)); + defaultLwM2MDownlinkMsgHandler.sendDiscoverRequest(lwM2MClient, request, new TbLwM2MDiscoverCallback(logService, lwM2MClient, targetId)); } private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId) { - sendReadRequest(lwM2MClient, versionedId, new TbLwM2MReadCallback(this, lwM2MClient, versionedId)); + sendReadRequest(lwM2MClient, versionedId, new TbLwM2MReadCallback(this, logService, lwM2MClient, versionedId)); } private void sendReadRequest(LwM2mClient lwM2MClient, String versionedId, DownlinkRequestCallback callback) { @@ -514,7 +499,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { } private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId) { - sendObserveRequest(lwM2MClient, versionedId, new TbLwM2MObserveCallback(this, lwM2MClient, versionedId)); + sendObserveRequest(lwM2MClient, versionedId, new TbLwM2MObserveCallback(this, logService, lwM2MClient, versionedId)); } private void sendObserveRequest(LwM2mClient lwM2MClient, String versionedId, DownlinkRequestCallback callback) { @@ -524,12 +509,12 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { private void sendWriteAttributesRequest(LwM2mClient lwM2MClient, String targetId, ObjectAttributes params) { TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(targetId).attributes(params).timeout(this.config.getTimeout()).build(); - defaultLwM2MDownlinkMsgHandler.sendWriteAttributesRequest(lwM2MClient, request, new TbLwM2MWriteAttributesCallback(this, lwM2MClient, targetId)); + defaultLwM2MDownlinkMsgHandler.sendWriteAttributesRequest(lwM2MClient, request, new TbLwM2MWriteAttributesCallback(logService, lwM2MClient, targetId)); } private void sendCancelObserveRequest(String versionedId, LwM2mClient client) { TbLwM2MCancelObserveRequest request = TbLwM2MCancelObserveRequest.builder().versionedId(versionedId).timeout(this.config.getTimeout()).build(); - defaultLwM2MDownlinkMsgHandler.sendCancelObserveRequest(client, request, new TbLwM2MCancelObserveCallback(this, client, versionedId)); + defaultLwM2MDownlinkMsgHandler.sendCancelObserveRequest(client, request, new TbLwM2MCancelObserveCallback(logService, client, versionedId)); } private void updateObjectResourceValue(LwM2mClient client, LwM2mObject lwM2mObject, String pathIdVer) { @@ -565,9 +550,9 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { if (path.equals(convertObjectIdToVersionedId(FW_NAME_ID, registration))) { otaService.onCurrentFirmwareNameUpdate(lwM2MClient, (String) lwM2mResource.getValue()); } else if (path.equals(convertObjectIdToVersionedId(FW_3_VER_ID, registration))) { - otaService.onCurrentFirmwareVersionUpdate(lwM2MClient, (String) lwM2mResource.getValue()); + otaService.onCurrentFirmwareVersion3Update(lwM2MClient, (String) lwM2mResource.getValue()); } else if (path.equals(convertObjectIdToVersionedId(FW_5_VER_ID, registration))) { - otaService.onCurrentFirmwareVersionUpdate(lwM2MClient, (String) lwM2mResource.getValue()); + otaService.onCurrentFirmwareVersion5Update(lwM2MClient, (String) lwM2mResource.getValue()); } else if (path.equals(convertObjectIdToVersionedId(FW_STATE_ID, registration))) { otaService.onCurrentFirmwareStateUpdate(lwM2MClient, (Long) lwM2mResource.getValue()); } else if (path.equals(convertObjectIdToVersionedId(FW_RESULT_ID, registration))) { @@ -919,93 +904,7 @@ public class DefaultLwM2MUplinkMsgHandler implements LwM2mUplinkMsgHandler { DonAsynchron.withCallback(attributesService.getSharedAttributes(lwM2MClient, keysToFetch), v -> attributesService.onAttributesUpdate(lwM2MClient, v), t -> log.error("[{}] Failed to get attributes", lwM2MClient.getEndpoint(), t), - registrationExecutor); - } - } - - public void getInfoFirmwareUpdate(LwM2mClient lwM2MClient) { - if (lwM2MClient.getRegistration().getSupportedVersion(FW_5_ID) != null) { - SessionInfoProto sessionInfo = this.getSessionInfo(lwM2MClient); - if (sessionInfo != null) { - DefaultLwM2MUplinkMsgHandler handler = this; - this.transportService.process(sessionInfo, createOtaPackageRequestMsg(sessionInfo, OtaPackageType.FIRMWARE.name()), - new TransportServiceCallback<>() { - @Override - public void onSuccess(TransportProtos.GetOtaPackageResponseMsg response) { - if (TransportProtos.ResponseStatus.SUCCESS.equals(response.getResponseStatus()) - && response.getType().equals(OtaPackageType.FIRMWARE.name())) { - LwM2mFwSwUpdate fwUpdate = lwM2MClient.getFwUpdate(DefaultLwM2MUplinkMsgHandler.this, clientContext); -// if (rpcRequest != null) { -// fwUpdate.setStateUpdate(INITIATED.name()); -// } - if (!FAILED.name().equals(fwUpdate.getStateUpdate())) { - log.warn("7) firmware start with ver: [{}]", response.getVersion()); -// fwUpdate.setRpcRequest(rpcRequest); - fwUpdate.setCurrentVersion(response.getVersion()); - fwUpdate.setCurrentTitle(response.getTitle()); - fwUpdate.setCurrentId(new UUID(response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB())); -// if (rpcRequest == null) { - fwUpdate.sendReadObserveInfo(defaultLwM2MDownlinkMsgHandler); -// } else { -// fwUpdate.writeFwSwWare(handler, defaultLwM2MDownlinkMsgHandler); -// } - } else { - String msgError = String.format("OtaPackage device: %s, version: %s, stateUpdate: %s", - lwM2MClient.getDeviceName(), response.getVersion(), fwUpdate.getStateUpdate()); - log.warn("7_1 [{}]", msgError); - } - } else { - String msgError = String.format("OtaPackage device: %s, responseStatus: %s", - lwM2MClient.getDeviceName(), response.getResponseStatus().toString()); - log.trace(msgError); -// if (rpcRequest != null) { - //TODO: refactor -// sendErrorRpcResponse(rpcRequest, msgError, sessionInfo); -// } - } - } - - @Override - public void onError(Throwable e) { - log.trace("Failed to process firmwareUpdate ", e); - } - }); - } - } - } - - public void getInfoSoftwareUpdate(LwM2mClient lwM2MClient) { - if (lwM2MClient.getRegistration().getSupportedVersion(SW_ID) != null) { - SessionInfoProto sessionInfo = this.getSessionInfo(lwM2MClient); - if (sessionInfo != null) { - DefaultLwM2MUplinkMsgHandler handler = this; - transportService.process(sessionInfo, createOtaPackageRequestMsg(sessionInfo, OtaPackageType.SOFTWARE.name()), - new TransportServiceCallback<>() { - @Override - public void onSuccess(TransportProtos.GetOtaPackageResponseMsg response) { - if (TransportProtos.ResponseStatus.SUCCESS.equals(response.getResponseStatus()) - && response.getType().equals(OtaPackageType.SOFTWARE.name())) { -// lwM2MClient.getSwUpdate().setRpcRequest(rpcRequest); - lwM2MClient.getSwUpdate().setCurrentVersion(response.getVersion()); - lwM2MClient.getSwUpdate().setCurrentTitle(response.getTitle()); - lwM2MClient.getSwUpdate().setCurrentId(new OtaPackageId(new UUID(response.getOtaPackageIdMSB(), response.getOtaPackageIdLSB())).getId()); - lwM2MClient.getSwUpdate().sendReadObserveInfo(defaultLwM2MDownlinkMsgHandler); -// if (rpcRequest == null) { - lwM2MClient.getSwUpdate().sendReadObserveInfo(defaultLwM2MDownlinkMsgHandler); -// } else { -// lwM2MClient.getSwUpdate().writeFwSwWare(handler, defaultLwM2MDownlinkMsgHandler); -// } - } else { - log.trace("Software [{}] [{}]", lwM2MClient.getDeviceName(), response.getResponseStatus().toString()); - } - } - - @Override - public void onError(Throwable e) { - log.trace("Failed to process softwareUpdate ", e); - } - }); - } + executor); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/LwM2mUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/LwM2mUplinkMsgHandler.java index 517db388e4..372daa7052 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/LwM2mUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/LwM2mUplinkMsgHandler.java @@ -52,10 +52,6 @@ public interface LwM2mUplinkMsgHandler { void onAwakeDev(Registration registration); - void logToTelemetry(LwM2mClient client, String msg); - - void logToTelemetry(String registrationId, String msg); - void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request); void onToTransportUpdateCredentials(TransportProtos.ToTransportUpdateCredentialsProto updateCredentials); diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index d81b28a5ff..d7e97231e0 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -142,12 +142,10 @@ transport: timeout: "${LWM2M_TIMEOUT:120000}" recommended_ciphers: "${LWM2M_RECOMMENDED_CIPHERS:false}" recommended_supported_groups: "${LWM2M_RECOMMENDED_SUPPORTED_GROUPS:true}" - response_pool_size: "${LWM2M_RESPONSE_POOL_SIZE:100}" - registered_pool_size: "${LWM2M_REGISTERED_POOL_SIZE:10}" - registration_store_pool_size: "${LWM2M_REGISTRATION_STORE_POOL_SIZE:100}" + uplink_pool_size: "${LWM2M_UPLINK_POOL_SIZE:10}" + downlink_pool_size: "${LWM2M_DOWNLINK_POOL_SIZE:10}" + ota_pool_size: "${LWM2M_OTA_POOL_SIZE:10}" clean_period_in_sec: "${LWM2M_CLEAN_PERIOD_IN_SEC:2}" - update_registered_pool_size: "${LWM2M_UPDATE_REGISTERED_POOL_SIZE:10}" - un_registered_pool_size: "${LWM2M_UN_REGISTERED_POOL_SIZE:10}" log_max_length: "${LWM2M_LOG_MAX_LENGTH:100}" # Use redis for Security and Registration stores redis.enabled: "${LWM2M_REDIS_ENABLED:false}"