From 6cdb508c71041fb15757ef2e7d9182e48ea18601 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 27 Jun 2019 10:00:17 +0300 Subject: [PATCH] Claiming devices implementation --- .../server/controller/BaseController.java | 31 +++- .../server/controller/DeviceController.java | 101 ++++++++++- .../controller/claim/data/ClaimRequest.java | 25 +++ ...ions.java => CustomerUserPermissions.java} | 50 ++--- .../security/permission/Operation.java | 2 +- .../transport/LocalTransportService.java | 42 ++++- .../src/main/resources/thingsboard.yml | 8 + .../server/common/data/CacheConstants.java | 1 + .../server/common/data/DataConstants.java | 4 + .../common/msg/session/FeatureType.java | 2 +- .../common/msg/session/SessionMsgType.java | 4 +- .../transport/coap/CoapTransportResource.java | 11 +- .../coap/adaptors/CoapTransportAdaptor.java | 3 +- .../coap/adaptors/JsonCoapAdaptor.java | 44 +++-- .../transport/http/DeviceApiController.java | 16 +- .../server/transport/mqtt/MqttTopics.java | 2 + .../transport/mqtt/MqttTransportHandler.java | 8 +- .../mqtt/adaptors/JsonMqttAdaptor.java | 24 ++- .../mqtt/adaptors/MqttTransportAdaptor.java | 4 + .../mqtt/session/GatewaySessionHandler.java | 39 +++- .../common/transport/TransportService.java | 15 +- .../transport/adaptor/JsonConverter.java | 93 +++++++--- .../service/AbstractTransportService.java | 15 +- .../service/RemoteTransportService.java | 35 +++- .../src/main/proto/transport.proto | 8 + .../dao/device/ClaimDevicesService.java | 35 ++++ .../dao/device/ClaimDevicesServiceImpl.java | 171 ++++++++++++++++++ .../server/dao/device/claim/ClaimData.java | 28 +++ .../dao/device/claim/ClaimResponse.java | 24 +++ .../resources/application-test.properties | 7 +- 30 files changed, 743 insertions(+), 109 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/controller/claim/data/ClaimRequest.java rename application/src/main/java/org/thingsboard/server/service/security/permission/{CustomerUserPremissions.java => CustomerUserPermissions.java} (78%) create mode 100644 dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesService.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimData.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimResponse.java diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 1d879ec99f..3fc809c424 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -28,7 +28,16 @@ import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.web.bind.annotation.ExceptionHandler; import org.thingsboard.server.actors.service.ActorService; -import org.thingsboard.server.common.data.*; +import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.Dashboard; +import org.thingsboard.server.common.data.DashboardInfo; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.HasName; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmId; import org.thingsboard.server.common.data.alarm.AlarmInfo; @@ -36,7 +45,19 @@ import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.id.WidgetTypeId; +import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.page.TextPageLink; @@ -45,7 +66,6 @@ import org.thingsboard.server.common.data.plugin.ComponentDescriptor; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.widget.WidgetType; import org.thingsboard.server.common.data.widget.WidgetsBundle; import org.thingsboard.server.common.msg.TbMsg; @@ -53,13 +73,13 @@ import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; -import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.audit.AuditLogService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; +import org.thingsboard.server.dao.device.ClaimDevicesService; import org.thingsboard.server.dao.device.DeviceCredentialsService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.entityview.EntityViewService; @@ -162,6 +182,9 @@ public abstract class BaseController { @Autowired protected AttributesService attributesService; + @Autowired + protected ClaimDevicesService claimDevicesService; + @Value("${server.log_controller_error_stack_trace}") @Getter private boolean logControllerErrorStackTrace; diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 0fd5ccfdfa..3753262bca 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -15,8 +15,11 @@ */ package org.thingsboard.server.controller; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; @@ -26,27 +29,31 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.device.DeviceSearchQuery; -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; -import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.controller.claim.data.ClaimRequest; +import org.thingsboard.server.dao.device.claim.ClaimResponse; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; +import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -55,7 +62,8 @@ import java.util.stream.Collectors; @RequestMapping("/api") public class DeviceController extends BaseController { - public static final String DEVICE_ID = "deviceId"; + private static final String DEVICE_ID = "deviceId"; + private static final String DEVICE_NAME = "deviceName"; @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/device/{deviceId}", method = RequestMethod.GET) @@ -379,4 +387,91 @@ public class DeviceController extends BaseController { throw handleException(e); } } + + @PreAuthorize("hasAnyAuthority('CUSTOMER_USER')") + @RequestMapping(value = "/customer/device/{deviceName}/claim", method = RequestMethod.POST) + @ResponseBody + public DeferredResult claimDevice(@PathVariable(DEVICE_NAME) String deviceName, + @RequestBody(required = false) ClaimRequest claimRequest) throws ThingsboardException { + checkParameter(DEVICE_NAME, deviceName); + try { + final DeferredResult deferredResult = new DeferredResult<>(); + + SecurityUser user = getCurrentUser(); + TenantId tenantId = user.getTenantId(); + CustomerId customerId = user.getCustomerId(); + + Device device = checkNotNull(deviceService.findDeviceByTenantIdAndName(tenantId, deviceName)); + accessControlService.checkPermission(user, Resource.DEVICE, Operation.CLAIM_DEVICES, + device.getId(), device); + String secretKey = getSecretKey(claimRequest); + + ListenableFuture future = claimDevicesService.claimDevice(device, customerId, secretKey); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable ClaimResponse result) { + HttpStatus status; + if (result.equals(ClaimResponse.SUCCESS)) { + status = HttpStatus.OK; + } else { + status = HttpStatus.BAD_REQUEST; + } + deferredResult.setResult(new ResponseEntity<>(result, status)); + } + + @Override + public void onFailure(Throwable t) { + deferredResult.setErrorResult(t); + } + }); + return deferredResult; + } catch (Exception e) { + throw handleException(e); + } + } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/customer/device/{deviceName}/claim", method = RequestMethod.DELETE) + @ResponseStatus(value = HttpStatus.OK) + public DeferredResult reClaimDevice(@PathVariable(DEVICE_NAME) String deviceName) throws ThingsboardException { + checkParameter(DEVICE_NAME, deviceName); + try { + final DeferredResult deferredResult = new DeferredResult<>(); + + SecurityUser user = getCurrentUser(); + TenantId tenantId = user.getTenantId(); + + Device device = checkNotNull(deviceService.findDeviceByTenantIdAndName(tenantId, deviceName)); + accessControlService.checkPermission(user, Resource.DEVICE, Operation.CLAIM_DEVICES, + device.getId(), device); + + ListenableFuture> future = claimDevicesService.reClaimDevice(tenantId, device); + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List result) { + if (result != null) { + deferredResult.setResult(new ResponseEntity(HttpStatus.OK)); + } else { + deferredResult.setResult(new ResponseEntity(HttpStatus.BAD_REQUEST)); + } + } + + @Override + public void onFailure(Throwable t) { + deferredResult.setErrorResult(t); + } + }); + return deferredResult; + } catch (Exception e) { + throw handleException(e); + } + } + + private String getSecretKey(ClaimRequest claimRequest) throws IOException { + String secretKey = claimRequest.getSecretKey(); + if (secretKey != null) { + return secretKey; + } + return DataConstants.DEFAULT_SECRET_KEY; + } } diff --git a/application/src/main/java/org/thingsboard/server/controller/claim/data/ClaimRequest.java b/application/src/main/java/org/thingsboard/server/controller/claim/data/ClaimRequest.java new file mode 100644 index 0000000000..6620aa187e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/controller/claim/data/ClaimRequest.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller.claim.data; + +import lombok.Data; + +@Data +public class ClaimRequest { + + private final String secretKey; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPremissions.java b/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java similarity index 78% rename from application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPremissions.java rename to application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java index aa98e4ae40..497bc3da2f 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPremissions.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java @@ -16,20 +16,20 @@ package org.thingsboard.server.service.security.permission; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.*; +import org.thingsboard.server.common.data.DashboardInfo; +import org.thingsboard.server.common.data.HasCustomerId; +import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.service.security.model.SecurityUser; -import java.util.HashMap; +@Component(value = "customerUserPermissions") +public class CustomerUserPermissions extends AbstractPermissions { -@Component(value="customerUserPermissions") -public class CustomerUserPremissions extends AbstractPermissions { - - public CustomerUserPremissions() { + public CustomerUserPermissions() { super(); put(Resource.ALARM, TenantAdminPermissions.tenantEntityPermissionChecker); put(Resource.ASSET, customerEntityPermissionChecker); @@ -44,26 +44,26 @@ public class CustomerUserPremissions extends AbstractPermissions { private static final PermissionChecker customerEntityPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ, Operation.READ_CREDENTIALS, - Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY, Operation.RPC_CALL) { + Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY, Operation.RPC_CALL, Operation.CLAIM_DEVICES) { - @Override - public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { + @Override + public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { - if (!super.hasPermission(user, operation, entityId, entity)) { - return false; - } - if (!user.getTenantId().equals(entity.getTenantId())) { - return false; - } - if (!(entity instanceof HasCustomerId)) { - return false; - } - if (!user.getCustomerId().equals(((HasCustomerId)entity).getCustomerId())) { - return false; - } - return true; - } - }; + if (!super.hasPermission(user, operation, entityId, entity)) { + return false; + } + if (!user.getTenantId().equals(entity.getTenantId())) { + return false; + } + if (!(entity instanceof HasCustomerId)) { + return false; + } + if (!operation.equals(Operation.CLAIM_DEVICES) && !user.getCustomerId().equals(((HasCustomerId) entity).getCustomerId())) { + return false; + } + return true; + } + }; private static final PermissionChecker customerPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ, Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY) { diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/Operation.java b/application/src/main/java/org/thingsboard/server/service/security/permission/Operation.java index 97f044743c..1d81cdaf43 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/Operation.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/Operation.java @@ -18,6 +18,6 @@ package org.thingsboard.server.service.security.permission; public enum Operation { ALL, CREATE, READ, WRITE, DELETE, ASSIGN_TO_CUSTOMER, UNASSIGN_FROM_CUSTOMER, RPC_CALL, - READ_CREDENTIALS, WRITE_CREDENTIALS, READ_ATTRIBUTES, WRITE_ATTRIBUTES, READ_TELEMETRY, WRITE_TELEMETRY + READ_CREDENTIALS, WRITE_CREDENTIALS, READ_ATTRIBUTES, WRITE_ATTRIBUTES, READ_TELEMETRY, WRITE_TELEMETRY, CLAIM_DEVICES } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java index 5e88e097c3..4dea106958 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java @@ -22,11 +22,31 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.util.DonAsynchron; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.service.AbstractTransportService; +import org.thingsboard.server.dao.device.ClaimDevicesService; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.*; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; @@ -35,6 +55,7 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Optional; +import java.util.UUID; import java.util.function.Consumer; /** @@ -58,6 +79,8 @@ public class LocalTransportService extends AbstractTransportService implements R private ClusterRpcService rpcService; @Autowired private DataDecodingEncodingService encodingService; + @Autowired + private ClaimDevicesService claimDevicesService; @PostConstruct public void init() { @@ -150,6 +173,23 @@ public class LocalTransportService extends AbstractTransportService implements R forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback); } + @Override + protected void registerClaimingInfo(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback callback) { + TransportToDeviceActorMsg toDeviceActorMsg = TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setClaimDevice(msg).build(); + + TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg); + Optional address = routingService.resolveById(wrapper.getDeviceId()); + if (address.isPresent()) { + rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper)); + callback.onSuccess(null); + } else { + TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB())); + DonAsynchron.withCallback(claimDevicesService.registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs()), + callback::onSuccess, callback::onError); + } + } + @Override public void process(String nodeId, DeviceActorToTransportMsg msg) { process(nodeId, msg, null, null); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index deb57b3d73..af0498fda9 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -103,6 +103,11 @@ security: user_token_access_enabled: "${SECURITY_USER_TOKEN_ACCESS_ENABLED:true}" # Enable/disable case-sensitive username login user_login_case_sensitive: "${SECURITY_USER_LOGIN_CASE_SENSITIVE:true}" + claim: + # Enable/disable claiming devices, if false -> the device's [claimingAllowed] SERVER_SCOPE attribute must be set to [true] to allow claiming specific device + allowClaimingByDefault: "${SECURITY_CLAIM_ALLOW_CLAIMING_BY_DEFAULT:true}" + # Time allowed to claim the device in milliseconds + duration: "${SECURITY_CLAIM_DURATION:60000}" # 1 minute, note this value must equal claimDevices.timeToLiveInMinutes value # Dashboard parameters dashboard: @@ -261,6 +266,9 @@ caffeine: entityViews: timeToLiveInMinutes: 1440 maxSize: 100000 + claimDevices: + timeToLiveInMinutes: 1 + maxSize: 100000 redis: # standalone or cluster diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index c16d3ee78f..67f0fb9a0b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -22,4 +22,5 @@ public class CacheConstants { public static final String SESSIONS_CACHE = "sessions"; public static final String ASSET_CACHE = "assets"; public static final String ENTITY_VIEW_CACHE = "entityViews"; + public static final String CLAIM_DEVICES_CACHE = "claimDevices"; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index d8c0599993..2e2130c4d4 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -59,4 +59,8 @@ public class DataConstants { public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; + public static final String DEFAULT_SECRET_KEY = ""; + public static final String SECRET_KEY_FIELD_NAME = "secretKey"; + public static final String DURATION_MS_FIELD_NAME = "durationMs"; + } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/FeatureType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/FeatureType.java index 4a35a2d330..d4f3ed9a45 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/FeatureType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/FeatureType.java @@ -16,5 +16,5 @@ package org.thingsboard.server.common.msg.session; public enum FeatureType { - ATTRIBUTES, TELEMETRY, RPC + ATTRIBUTES, TELEMETRY, RPC, CLAIM } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionMsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionMsgType.java index a8bab464d7..74726e7a96 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionMsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionMsgType.java @@ -28,7 +28,9 @@ public enum SessionMsgType { RULE_ENGINE_ERROR, - SESSION_OPEN, SESSION_CLOSE; + SESSION_OPEN, SESSION_CLOSE, + + CLAIM_REQUEST(); private final boolean requiresRulesProcessing; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 64297adca1..1f7cd7cb67 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.coap.CoAP.ResponseCode; import org.eclipse.californium.core.coap.Request; -import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.network.Exchange; import org.eclipse.californium.core.network.ExchangeObserver; import org.eclipse.californium.core.server.resources.CoapExchange; @@ -122,6 +121,9 @@ public class CoapTransportResource extends CoapResource { processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST); } break; + case CLAIM: + processRequest(exchange, SessionMsgType.CLAIM_REQUEST); + break; } } } @@ -153,6 +155,11 @@ public class CoapTransportResource extends CoapResource { transportContext.getAdaptor().convertToPostTelemetry(sessionId, request), new CoapOkCallback(exchange)); break; + case CLAIM_REQUEST: + transportService.process(sessionInfo, + transportContext.getAdaptor().convertToClaimDevice(sessionId, request, sessionInfo), + new CoapOkCallback(exchange)); + break; case SUBSCRIBE_ATTRIBUTES_REQUEST: advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced), registerAsyncCoapSession(exchange, request, sessionInfo, sessionId))); @@ -319,7 +326,7 @@ public class CoapTransportResource extends CoapResource { @Override public void onSuccess(Void msg) { - exchange.respond(ResponseCode.VALID); + exchange.respond(ResponseCode.VALID); } @Override diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java index fa8b3e1df5..97424b3943 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java @@ -22,7 +22,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.CoapTransportResource; import java.util.UUID; -import java.util.Optional; public interface CoapTransportAdaptor { @@ -36,6 +35,8 @@ public interface CoapTransportAdaptor { TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(UUID sessionId, Request inbound) throws AdaptorException; + TransportProtos.ClaimDeviceMsg convertToClaimDevice(UUID sessionId, Request inbound, TransportProtos.SessionInfoProto sessionInfo) throws AdaptorException; + Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException; Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java index ecbf49d740..5649dde657 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java @@ -15,33 +15,36 @@ */ package org.thingsboard.server.transport.coap.adaptors; -import java.util.*; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; -import org.thingsboard.server.common.msg.kv.AttributesKVMsg; -import org.thingsboard.server.common.msg.session.SessionContext; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.springframework.stereotype.Component; - -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.CoapTransportResource; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + @Component("JsonCoapAdaptor") @Slf4j public class JsonCoapAdaptor implements CoapTransportAdaptor { @Override public TransportProtos.PostTelemetryMsg convertToPostTelemetry(UUID sessionId, Request inbound) throws AdaptorException { - String payload = validatePayload(sessionId, inbound); + String payload = validatePayload(sessionId, inbound, false); try { return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload)); } catch (IllegalStateException | JsonSyntaxException ex) { @@ -51,7 +54,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { @Override public TransportProtos.PostAttributeMsg convertToPostAttributes(UUID sessionId, Request inbound) throws AdaptorException { - String payload = validatePayload(sessionId, inbound); + String payload = validatePayload(sessionId, inbound, false); try { return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload)); } catch (IllegalStateException | JsonSyntaxException ex) { @@ -79,7 +82,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { @Override public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound) throws AdaptorException { Optional requestId = CoapTransportResource.getRequestId(inbound); - String payload = validatePayload(sessionId, inbound); + String payload = validatePayload(sessionId, inbound, false); JsonObject response = new JsonParser().parse(payload).getAsJsonObject(); return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.orElseThrow(() -> new AdaptorException("Request id is missing!"))) .setPayload(response.toString()).build(); @@ -87,10 +90,21 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { @Override public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(UUID sessionId, Request inbound) throws AdaptorException { - String payload = validatePayload(sessionId, inbound); + String payload = validatePayload(sessionId, inbound, false); return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0); } + @Override + public TransportProtos.ClaimDeviceMsg convertToClaimDevice(UUID sessionId, Request inbound, TransportProtos.SessionInfoProto sessionInfo) throws AdaptorException { + DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); + String payload = validatePayload(sessionId, inbound, true); + try { + return JsonConverter.convertToClaimDeviceProto(deviceId, payload); + } catch (IllegalStateException | JsonSyntaxException ex) { + throw new AdaptorException(ex); + } + } + @Override public Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.AttributeUpdateNotificationMsg msg) throws AdaptorException { return getObserveNotification(session.getNextSeqNumber(), JsonConverter.toJson(msg)); @@ -128,11 +142,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { return response; } - private String validatePayload(UUID sessionId, Request inbound) throws AdaptorException { + private String validatePayload(UUID sessionId, Request inbound, boolean isEmptyPayloadAllowed) throws AdaptorException { String payload = inbound.getPayloadString(); if (payload == null) { log.warn("[{}] Payload is empty!", sessionId); - throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); + if (!isEmptyPayloadAllowed) { + throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); + } } return payload; } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 0c0a63f357..8bc2c883c6 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -20,7 +20,6 @@ import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.util.StringUtils; @@ -31,6 +30,7 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; @@ -119,6 +119,20 @@ public class DeviceApiController { return responseWriter; } + @RequestMapping(value = "/{deviceToken}/claim", method = RequestMethod.POST) + public DeferredResult claimDevice(@PathVariable("deviceToken") String deviceToken, + @RequestBody(required = false) String json, HttpServletRequest request) { + DeferredResult responseWriter = new DeferredResult<>(); + transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), + new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { + TransportService transportService = transportContext.getTransportService(); + DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); + transportService.process(sessionInfo, JsonConverter.convertToClaimDeviceProto(deviceId, json), + new HttpOkCallback(responseWriter)); + })); + return responseWriter; + } + @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json") public DeferredResult subscribeToCommands(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java index 0d4096d56d..e9e21d8bb7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java @@ -29,6 +29,7 @@ public class MqttTopics { public static final String DEVICE_ATTRIBUTES_RESPONSES_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + "+"; public static final String DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC + "/attributes/request/"; public static final String DEVICE_TELEMETRY_TOPIC = BASE_DEVICE_API_TOPIC + "/telemetry"; + public static final String DEVICE_CLAIM_TOPIC = BASE_DEVICE_API_TOPIC + "/claim"; public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes"; public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; @@ -36,6 +37,7 @@ public class MqttTopics { public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/disconnect"; public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes"; public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry"; + public static final String GATEWAY_CLAIM_TOPIC = BASE_GATEWAY_API_TOPIC + "/claim"; public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request"; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response"; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 382211e8a5..d7d2281b4f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -38,11 +38,11 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; +import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.transport.service.AbstractTransportService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; @@ -183,6 +183,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_TELEMETRY_TOPIC: gatewaySessionHandler.onDeviceTelemetry(mqttMsg); break; + case MqttTopics.GATEWAY_CLAIM_TOPIC: + gatewaySessionHandler.onDeviceClaim(mqttMsg); + break; case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: gatewaySessionHandler.onDeviceAttributes(mqttMsg); break; @@ -221,6 +224,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg); transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { + TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); + transportService.process(sessionInfo, claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); } } catch (AdaptorException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index a263c649b6..db2c2193bc 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -57,7 +57,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @Override public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); + String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false); try { return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload)); } catch (IllegalStateException | JsonSyntaxException ex) { @@ -67,7 +67,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @Override public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); + String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false); try { return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload)); } catch (IllegalStateException | JsonSyntaxException ex) { @@ -114,7 +114,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @Override public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); + String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false); try { Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length())); return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId); @@ -123,6 +123,16 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } + @Override + public TransportProtos.ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { + String payload = validatePayload(ctx.getSessionId(), inbound.payload(), true); + try { + return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload); + } catch (IllegalStateException | JsonSyntaxException ex) { + throw new AdaptorException(ex); + } + } + @Override public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { if (!StringUtils.isEmpty(responseMsg.getError())) { @@ -193,7 +203,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { - String payload = validatePayload(sessionId, payloadData); + String payload = validatePayload(sessionId, payloadData, false); try { return new JsonParser().parse(payload); } catch (JsonSyntaxException ex) { @@ -201,12 +211,14 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } - private static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { + private static String validatePayload(UUID sessionId, ByteBuf payloadData, boolean isEmptyPayloadAllowed) throws AdaptorException { try { String payload = payloadData.toString(UTF8); if (payload == null) { log.warn("[{}] Payload is empty!", sessionId); - throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); + if (!isEmptyPayloadAllowed) { + throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); + } } return payload; } finally { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index a6d746caf8..a3465b2961 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; @@ -46,6 +47,8 @@ public interface MqttTransportAdaptor { ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException; + ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException; Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException; @@ -59,4 +62,5 @@ public interface MqttTransportAdaptor { Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException; + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index e43b8dde9c..6d1fcf1316 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -30,6 +30,7 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -183,7 +184,42 @@ public class GatewaySessionHandler { @Override public void onFailure(Throwable t) { - log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t); + log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); + } + }, context.getExecutor()); + } + } else { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); + } + } + + public void onDeviceClaim(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload()); + int msgId = mqttMsg.variableHeader().packetId(); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + for (Map.Entry deviceEntry : jsonObj.entrySet()) { + String deviceName = deviceEntry.getKey(); + Futures.addCallback(checkDeviceConnected(deviceName), + new FutureCallback() { + @Override + public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) { + if (!deviceEntry.getValue().isJsonObject()) { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); + } + try { + DeviceId deviceId = deviceCtx.getDeviceId(); + TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue()); + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg)); + } catch (Throwable e) { + UUID gatewayId = new UUID(gateway.getDeviceIdMSB(), gateway.getDeviceIdLSB()); + log.warn("[{}][{}] Failed to convert claim message: {}", gatewayId, deviceName, deviceEntry.getValue(), e); + } + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}] Failed to process device claiming command: {}", sessionId, deviceName, t); } }, context.getExecutor()); } @@ -209,6 +245,7 @@ public class GatewaySessionHandler { TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject()); transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); } + @Override public void onFailure(Throwable t) { log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 8d8d450c78..215a91e7fe 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -16,18 +16,19 @@ package org.thingsboard.server.common.transport; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; /** * Created by ashvayka on 04.10.18. @@ -63,6 +64,8 @@ public interface TransportService { void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback callback); + void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener); void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java index eea2cec6ae..24b1343491 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java @@ -24,6 +24,8 @@ import com.google.gson.JsonPrimitive; import com.google.gson.JsonSyntaxException; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.util.StringUtils; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.msg.kv.AttributesKVMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; @@ -63,23 +66,69 @@ public class JsonConverter { private static int maxStringValueLength = 0; - public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException { - long systemTs = System.currentTimeMillis(); + public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException { PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); - if (jsonObject.isJsonObject()) { - parseObject(builder, systemTs, jsonObject); - } else if (jsonObject.isJsonArray()) { - jsonObject.getAsJsonArray().forEach(je -> { + convertToTelemetry(jsonElement, System.currentTimeMillis(), null, builder); + return builder.build(); + } + + private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map> result, PostTelemetryMsg.Builder builder) { + if (jsonElement.isJsonObject()) { + parseObject(systemTs, result, builder, jsonElement.getAsJsonObject()); + } else if (jsonElement.isJsonArray()) { + jsonElement.getAsJsonArray().forEach(je -> { if (je.isJsonObject()) { - parseObject(builder, systemTs, je.getAsJsonObject()); + parseObject(systemTs, result, builder, je.getAsJsonObject()); } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); } }); } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject); + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonElement); } - return builder.build(); + } + + private static void parseObject(long systemTs, Map> result, PostTelemetryMsg.Builder builder, JsonObject jo) { + if (result != null) { + parseObject(result, systemTs, jo); + } else { + parseObject(builder, systemTs, jo); + } + } + + public static ClaimDeviceMsg convertToClaimDeviceProto(DeviceId deviceId, String json) { + long durationMs = 0L; + if (json != null && !json.isEmpty()) { + return convertToClaimDeviceProto(deviceId, new JsonParser().parse(json)); + } + return buildClaimDeviceMsg(deviceId, DataConstants.DEFAULT_SECRET_KEY, durationMs); + } + + public static ClaimDeviceMsg convertToClaimDeviceProto(DeviceId deviceId, JsonElement jsonElement) { + String secretKey = DataConstants.DEFAULT_SECRET_KEY; + long durationMs = 0L; + if (jsonElement.isJsonObject()) { + JsonObject jo = jsonElement.getAsJsonObject(); + if (jo.has(DataConstants.SECRET_KEY_FIELD_NAME)) { + secretKey = jo.get(DataConstants.SECRET_KEY_FIELD_NAME).getAsString(); + } + if (jo.has(DataConstants.DURATION_MS_FIELD_NAME)) { + durationMs = jo.get(DataConstants.DURATION_MS_FIELD_NAME).getAsLong(); + } + } else { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonElement); + } + return buildClaimDeviceMsg(deviceId, secretKey, durationMs); + } + + private static ClaimDeviceMsg buildClaimDeviceMsg(DeviceId deviceId, String secretKey, long durationMs) { + ClaimDeviceMsg.Builder result = ClaimDeviceMsg.newBuilder(); + return result + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setSecretKey(secretKey) + .setDurationMs(durationMs) + .build(); } public static PostAttributeMsg convertToAttributesProto(JsonElement jsonObject) throws JsonSyntaxException { @@ -103,8 +152,7 @@ public class JsonConverter { return result; } - private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) { - JsonObject jo = jsonObject.getAsJsonObject(); + private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonObject jo) { if (jo.has("ts") && jo.has("values")) { parseWithTs(builder, jo); } else { @@ -137,7 +185,7 @@ public class JsonConverter { String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength); throw new JsonSyntaxException(message); } - if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) { + if (isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) { try { result.add(buildNumericKeyValueProto(value, valueEntry.getKey())); } catch (RuntimeException th) { @@ -400,7 +448,7 @@ public class JsonConverter { String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength); throw new JsonSyntaxException(message); } - if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) { + if (isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) { try { parseNumericValue(result, valueEntry, value); } catch (RuntimeException th) { @@ -423,26 +471,13 @@ public class JsonConverter { return result; } - public static Map> convertToTelemetry(JsonElement jsonObject, long systemTs) throws JsonSyntaxException { + public static Map> convertToTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException { Map> result = new HashMap<>(); - if (jsonObject.isJsonObject()) { - parseObject(result, systemTs, jsonObject); - } else if (jsonObject.isJsonArray()) { - jsonObject.getAsJsonArray().forEach(je -> { - if (je.isJsonObject()) { - parseObject(result, systemTs, je.getAsJsonObject()); - } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); - } - }); - } else { - throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject); - } + convertToTelemetry(jsonElement, systemTs, result, null); return result; } - private static void parseObject(Map> result, long systemTs, JsonElement jsonObject) { - JsonObject jo = jsonObject.getAsJsonObject(); + private static void parseObject(Map> result, long systemTs, JsonObject jo) { if (jo.has("ts") && jo.has("values")) { parseWithTs(result, jo); } else { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index f55cbbe1d5..5a9b01b73a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -28,7 +28,12 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Created by ashvayka on 17.10.18. @@ -127,6 +132,12 @@ public abstract class AbstractTransportService implements TransportService { } } + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, + TransportServiceCallback callback) { + registerClaimingInfo(sessionInfo, msg, callback); + } + @Override public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { reportActivityInternal(sessionInfo); @@ -148,6 +159,8 @@ public abstract class AbstractTransportService implements TransportService { protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback); + protected abstract void registerClaimingInfo(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback callback); + private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { UUID sessionId = toId(sessionInfo); SessionMetaData sessionMetaData = sessions.get(sessionId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index 58f3ce22a9..ec7f491fed 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -24,34 +24,40 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.transport.SessionMsgListener; -import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.*; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; -import org.thingsboard.server.kafka.*; +import org.thingsboard.server.kafka.AsyncCallbackTemplate; +import org.thingsboard.server.kafka.TBKafkaAdmin; +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; +import org.thingsboard.server.kafka.TBKafkaProducerTemplate; +import org.thingsboard.server.kafka.TbKafkaRequestTemplate; +import org.thingsboard.server.kafka.TbKafkaSettings; +import org.thingsboard.server.kafka.TbNodeIdProvider; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -305,6 +311,15 @@ public class RemoteTransportService extends AbstractTransportService { send(sessionInfo, toRuleEngineMsg, callback); } + @Override + protected void registerClaimingInfo(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setClaimDevice(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + private static class TransportCallbackAdaptor implements Callback { private final TransportServiceCallback callback; diff --git a/common/transport/transport-api/src/main/proto/transport.proto b/common/transport/transport-api/src/main/proto/transport.proto index 329d205073..feedca4ab9 100644 --- a/common/transport/transport-api/src/main/proto/transport.proto +++ b/common/transport/transport-api/src/main/proto/transport.proto @@ -172,6 +172,13 @@ message ToServerRpcResponseMsg { string error = 3; } +message ClaimDeviceMsg { + int64 deviceIdMSB = 1; + int64 deviceIdLSB = 2; + string secretKey = 3; + int64 durationMs = 4; +} + //Used to report session state to tb-node and persist this state in the cache on the tb-node level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -199,6 +206,7 @@ message TransportToDeviceActorMsg { ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8; ToServerRpcRequestMsg toServerRPCCallRequest = 9; SubscriptionInfoProto subscriptionInfo = 10; + ClaimDeviceMsg claimDevice = 11; } message DeviceActorToTransportMsg { diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesService.java b/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesService.java new file mode 100644 index 0000000000..dad22a4a06 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesService.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.device; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.device.claim.ClaimResponse; + +import java.util.List; + +public interface ClaimDevicesService { + + ListenableFuture registerClaimingInfo(TenantId tenantId, DeviceId deviceId, String secretKey, long durationMs); + + ListenableFuture claimDevice(Device device, CustomerId customerId, String secretKey); + + ListenableFuture> reClaimDevice(TenantId tenantId, Device device); + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java new file mode 100644 index 0000000000..76a73c37d2 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java @@ -0,0 +1,171 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.device; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.device.claim.ClaimData; +import org.thingsboard.server.dao.device.claim.ClaimResponse; +import org.thingsboard.server.dao.model.ModelConstants; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.thingsboard.server.common.data.CacheConstants.CLAIM_DEVICES_CACHE; + +@Service +@Slf4j +public class ClaimDevicesServiceImpl implements ClaimDevicesService { + + private static final String CLAIM_ATTRIBUTE_NAME = "claimingAllowed"; + + @Autowired + private DeviceService deviceService; + @Autowired + private AttributesService attributesService; + @Autowired + private CacheManager cacheManager; + + @Value("${security.claim.allowClaimingByDefault}") + private boolean isAllowedClaimingByDefault; + + @Value("${security.claim.duration}") + private long systemDurationMs; + + @Override + public ListenableFuture registerClaimingInfo(TenantId tenantId, DeviceId deviceId, String secretKey, long durationMs) { + ListenableFuture deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId); + return Futures.transformAsync(deviceFuture, device -> { + Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE); + List key = constructCacheKey(device.getId()); + + if (isAllowedClaimingByDefault) { + if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) { + persistInCache(secretKey, durationMs, cache, key); + return Futures.immediateFuture(null); + } + log.warn("The device [{}] has been already claimed!", device.getName()); + throw new IllegalArgumentException(); + } else { + ListenableFuture> claimingAllowedFuture = attributesService.find(tenantId, device.getId(), + DataConstants.SERVER_SCOPE, Collections.singletonList(CLAIM_ATTRIBUTE_NAME)); + return Futures.transform(claimingAllowedFuture, list -> { + if (list != null && !list.isEmpty()) { + Optional claimingAllowedOptional = list.get(0).getBooleanValue(); + if (claimingAllowedOptional.isPresent() && claimingAllowedOptional.get() + && device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) { + persistInCache(secretKey, durationMs, cache, key); + return null; + } + } + log.warn("Failed to find claimingAllowed attribute for device or it is already claimed![{}]", device.getName()); + throw new IllegalArgumentException(); + }); + } + }); + } + + @Override + public ListenableFuture claimDevice(Device device, CustomerId customerId, String secretKey) { + List key = constructCacheKey(device.getId()); + Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE); + ClaimData claimData = cache.get(key, ClaimData.class); + if (claimData != null) { + long currTs = System.currentTimeMillis(); + if (currTs > claimData.getExpirationTime() || !secretKey.equals(claimData.getSecretKey())) { + log.warn("The claiming timeout occurred or wrong 'secretKey' provided for the device [{}]", device.getName()); + cache.evict(key); + return Futures.immediateFuture(ClaimResponse.FAILURE); + } else { + if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) { + device.setCustomerId(customerId); + deviceService.saveDevice(device); + return Futures.transform(removeClaimingSavedData(cache, key, device), result -> ClaimResponse.SUCCESS); + } + return Futures.transform(removeClaimingSavedData(cache, key, device), result -> ClaimResponse.CLAIMED); + } + } else { + log.warn("Failed to find the device's claiming message![{}]", device.getName()); + return Futures.immediateFuture(ClaimResponse.CLAIMED); + } + } + + @Override + public ListenableFuture> reClaimDevice(TenantId tenantId, Device device) { + if (!device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) { + cacheEviction(device.getId()); + + device.setCustomerId(null); + deviceService.saveDevice(device); + if (isAllowedClaimingByDefault) { + return Futures.immediateFuture(Collections.emptyList()); + } + return attributesService.save(tenantId, device.getId(), DataConstants.SERVER_SCOPE, Collections.singletonList( + new BaseAttributeKvEntry(new BooleanDataEntry(CLAIM_ATTRIBUTE_NAME, true), + System.currentTimeMillis()))); + } + cacheEviction(device.getId()); + return Futures.immediateFuture(Collections.emptyList()); + } + + private List constructCacheKey(DeviceId deviceId) { + return Collections.singletonList(deviceId); + } + + private void persistInCache(String secretKey, long durationMs, Cache cache, List key) { + ClaimData claimData = new ClaimData(secretKey, + System.currentTimeMillis() + validateDurationMs(durationMs)); + cache.putIfAbsent(key, claimData); + } + + private long validateDurationMs(long durationMs) { + if (durationMs > 0L) { + return durationMs; + } + return systemDurationMs; + } + + private ListenableFuture> removeClaimingSavedData(Cache cache, List key, Device device) { + cache.evict(key); + if (isAllowedClaimingByDefault) { + return Futures.immediateFuture(null); + } + return attributesService.removeAll(device.getTenantId(), + device.getId(), DataConstants.SERVER_SCOPE, Collections.singletonList(CLAIM_ATTRIBUTE_NAME)); + } + + private void cacheEviction(DeviceId deviceId) { + Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE); + cache.evict(constructCacheKey(deviceId)); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimData.java b/dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimData.java new file mode 100644 index 0000000000..5f7b0c4b78 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimData.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.device.claim; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@AllArgsConstructor +@Data +public class ClaimData { + + private final String secretKey; + private final long expirationTime; + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimResponse.java b/dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimResponse.java new file mode 100644 index 0000000000..715edae0f8 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimResponse.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.device.claim; + +public enum ClaimResponse { + + SUCCESS, + FAILURE, + CLAIMED + +} diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index d8f18e090f..263da99a4b 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -27,11 +27,16 @@ caffeine.specs.assets.maxSize=100000 caffeine.specs.entityViews.timeToLiveInMinutes=1440 caffeine.specs.entityViews.maxSize=100000 +caffeine.specs.claimDevices.timeToLiveInMinutes=1440 +caffeine.specs.claimDevices.maxSize=100000 + redis.connection.host=localhost redis.connection.port=6379 redis.connection.db=0 redis.connection.password= security.user_login_case_sensitive=true +security.claim.allowClaimingByDefault=true +security.claim.duration=60000 -database.ts_max_intervals=700 \ No newline at end of file +database.ts_max_intervals=700