Browse Source

Claiming devices implementation

pull/1823/head
Igor Kulikov 7 years ago
committed by GitHub
parent
commit
6cdb508c71
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      application/src/main/java/org/thingsboard/server/controller/BaseController.java
  2. 101
      application/src/main/java/org/thingsboard/server/controller/DeviceController.java
  3. 25
      application/src/main/java/org/thingsboard/server/controller/claim/data/ClaimRequest.java
  4. 50
      application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java
  5. 2
      application/src/main/java/org/thingsboard/server/service/security/permission/Operation.java
  6. 42
      application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
  7. 8
      application/src/main/resources/thingsboard.yml
  8. 1
      common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
  9. 4
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  10. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/session/FeatureType.java
  11. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionMsgType.java
  12. 11
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
  13. 3
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java
  14. 44
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
  15. 16
      common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
  16. 2
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java
  17. 8
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  18. 24
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
  19. 4
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
  20. 39
      common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
  21. 15
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  22. 93
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
  23. 15
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
  24. 35
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
  25. 8
      common/transport/transport-api/src/main/proto/transport.proto
  26. 35
      dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesService.java
  27. 171
      dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java
  28. 28
      dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimData.java
  29. 24
      dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimResponse.java
  30. 7
      dao/src/test/resources/application-test.properties

31
application/src/main/java/org/thingsboard/server/controller/BaseController.java

@ -28,7 +28,16 @@ import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.*;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DashboardInfo;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmId;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
@ -36,7 +45,19 @@ import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.page.TextPageLink;
@ -45,7 +66,6 @@ import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.common.msg.TbMsg;
@ -53,13 +73,13 @@ import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.ClaimDevicesService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.entityview.EntityViewService;
@ -162,6 +182,9 @@ public abstract class BaseController {
@Autowired
protected AttributesService attributesService;
@Autowired
protected ClaimDevicesService claimDevicesService;
@Value("${server.log_controller_error_stack_trace}")
@Getter
private boolean logControllerErrorStackTrace;

101
application/src/main/java/org/thingsboard/server/controller/DeviceController.java

@ -15,8 +15,11 @@
*/
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
@ -26,27 +29,31 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.controller.claim.data.ClaimRequest;
import org.thingsboard.server.dao.device.claim.ClaimResponse;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@ -55,7 +62,8 @@ import java.util.stream.Collectors;
@RequestMapping("/api")
public class DeviceController extends BaseController {
public static final String DEVICE_ID = "deviceId";
private static final String DEVICE_ID = "deviceId";
private static final String DEVICE_NAME = "deviceName";
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/device/{deviceId}", method = RequestMethod.GET)
@ -379,4 +387,91 @@ public class DeviceController extends BaseController {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('CUSTOMER_USER')")
@RequestMapping(value = "/customer/device/{deviceName}/claim", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> claimDevice(@PathVariable(DEVICE_NAME) String deviceName,
@RequestBody(required = false) ClaimRequest claimRequest) throws ThingsboardException {
checkParameter(DEVICE_NAME, deviceName);
try {
final DeferredResult<ResponseEntity> deferredResult = new DeferredResult<>();
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
CustomerId customerId = user.getCustomerId();
Device device = checkNotNull(deviceService.findDeviceByTenantIdAndName(tenantId, deviceName));
accessControlService.checkPermission(user, Resource.DEVICE, Operation.CLAIM_DEVICES,
device.getId(), device);
String secretKey = getSecretKey(claimRequest);
ListenableFuture<ClaimResponse> future = claimDevicesService.claimDevice(device, customerId, secretKey);
Futures.addCallback(future, new FutureCallback<ClaimResponse>() {
@Override
public void onSuccess(@Nullable ClaimResponse result) {
HttpStatus status;
if (result.equals(ClaimResponse.SUCCESS)) {
status = HttpStatus.OK;
} else {
status = HttpStatus.BAD_REQUEST;
}
deferredResult.setResult(new ResponseEntity<>(result, status));
}
@Override
public void onFailure(Throwable t) {
deferredResult.setErrorResult(t);
}
});
return deferredResult;
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/customer/device/{deviceName}/claim", method = RequestMethod.DELETE)
@ResponseStatus(value = HttpStatus.OK)
public DeferredResult<ResponseEntity> reClaimDevice(@PathVariable(DEVICE_NAME) String deviceName) throws ThingsboardException {
checkParameter(DEVICE_NAME, deviceName);
try {
final DeferredResult<ResponseEntity> deferredResult = new DeferredResult<>();
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
Device device = checkNotNull(deviceService.findDeviceByTenantIdAndName(tenantId, deviceName));
accessControlService.checkPermission(user, Resource.DEVICE, Operation.CLAIM_DEVICES,
device.getId(), device);
ListenableFuture<List<Void>> future = claimDevicesService.reClaimDevice(tenantId, device);
Futures.addCallback(future, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(@Nullable List<Void> result) {
if (result != null) {
deferredResult.setResult(new ResponseEntity(HttpStatus.OK));
} else {
deferredResult.setResult(new ResponseEntity(HttpStatus.BAD_REQUEST));
}
}
@Override
public void onFailure(Throwable t) {
deferredResult.setErrorResult(t);
}
});
return deferredResult;
} catch (Exception e) {
throw handleException(e);
}
}
private String getSecretKey(ClaimRequest claimRequest) throws IOException {
String secretKey = claimRequest.getSecretKey();
if (secretKey != null) {
return secretKey;
}
return DataConstants.DEFAULT_SECRET_KEY;
}
}

25
application/src/main/java/org/thingsboard/server/controller/claim/data/ClaimRequest.java

@ -0,0 +1,25 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller.claim.data;
import lombok.Data;
@Data
public class ClaimRequest {
private final String secretKey;
}

50
application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPremissions.java → application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java

@ -16,20 +16,20 @@
package org.thingsboard.server.service.security.permission;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.*;
import org.thingsboard.server.common.data.DashboardInfo;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.HasTenantId;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.HashMap;
@Component(value = "customerUserPermissions")
public class CustomerUserPermissions extends AbstractPermissions {
@Component(value="customerUserPermissions")
public class CustomerUserPremissions extends AbstractPermissions {
public CustomerUserPremissions() {
public CustomerUserPermissions() {
super();
put(Resource.ALARM, TenantAdminPermissions.tenantEntityPermissionChecker);
put(Resource.ASSET, customerEntityPermissionChecker);
@ -44,26 +44,26 @@ public class CustomerUserPremissions extends AbstractPermissions {
private static final PermissionChecker customerEntityPermissionChecker =
new PermissionChecker.GenericPermissionChecker(Operation.READ, Operation.READ_CREDENTIALS,
Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY, Operation.RPC_CALL) {
Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY, Operation.RPC_CALL, Operation.CLAIM_DEVICES) {
@Override
public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) {
@Override
public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) {
if (!super.hasPermission(user, operation, entityId, entity)) {
return false;
}
if (!user.getTenantId().equals(entity.getTenantId())) {
return false;
}
if (!(entity instanceof HasCustomerId)) {
return false;
}
if (!user.getCustomerId().equals(((HasCustomerId)entity).getCustomerId())) {
return false;
}
return true;
}
};
if (!super.hasPermission(user, operation, entityId, entity)) {
return false;
}
if (!user.getTenantId().equals(entity.getTenantId())) {
return false;
}
if (!(entity instanceof HasCustomerId)) {
return false;
}
if (!operation.equals(Operation.CLAIM_DEVICES) && !user.getCustomerId().equals(((HasCustomerId) entity).getCustomerId())) {
return false;
}
return true;
}
};
private static final PermissionChecker customerPermissionChecker =
new PermissionChecker.GenericPermissionChecker(Operation.READ, Operation.READ_ATTRIBUTES, Operation.READ_TELEMETRY) {

2
application/src/main/java/org/thingsboard/server/service/security/permission/Operation.java

@ -18,6 +18,6 @@ package org.thingsboard.server.service.security.permission;
public enum Operation {
ALL, CREATE, READ, WRITE, DELETE, ASSIGN_TO_CUSTOMER, UNASSIGN_FROM_CUSTOMER, RPC_CALL,
READ_CREDENTIALS, WRITE_CREDENTIALS, READ_ATTRIBUTES, WRITE_ATTRIBUTES, READ_TELEMETRY, WRITE_TELEMETRY
READ_CREDENTIALS, WRITE_CREDENTIALS, READ_ATTRIBUTES, WRITE_ATTRIBUTES, READ_TELEMETRY, WRITE_TELEMETRY, CLAIM_DEVICES
}

42
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java

@ -22,11 +22,31 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.util.DonAsynchron;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.service.AbstractTransportService;
import org.thingsboard.server.dao.device.ClaimDevicesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
@ -35,6 +55,7 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
/**
@ -58,6 +79,8 @@ public class LocalTransportService extends AbstractTransportService implements R
private ClusterRpcService rpcService;
@Autowired
private DataDecodingEncodingService encodingService;
@Autowired
private ClaimDevicesService claimDevicesService;
@PostConstruct
public void init() {
@ -150,6 +173,23 @@ public class LocalTransportService extends AbstractTransportService implements R
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
}
@Override
protected void registerClaimingInfo(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
TransportToDeviceActorMsg toDeviceActorMsg = TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setClaimDevice(msg).build();
TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
if (address.isPresent()) {
rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
callback.onSuccess(null);
} else {
TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
DonAsynchron.withCallback(claimDevicesService.registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs()),
callback::onSuccess, callback::onError);
}
}
@Override
public void process(String nodeId, DeviceActorToTransportMsg msg) {
process(nodeId, msg, null, null);

8
application/src/main/resources/thingsboard.yml

@ -103,6 +103,11 @@ security:
user_token_access_enabled: "${SECURITY_USER_TOKEN_ACCESS_ENABLED:true}"
# Enable/disable case-sensitive username login
user_login_case_sensitive: "${SECURITY_USER_LOGIN_CASE_SENSITIVE:true}"
claim:
# Enable/disable claiming devices, if false -> the device's [claimingAllowed] SERVER_SCOPE attribute must be set to [true] to allow claiming specific device
allowClaimingByDefault: "${SECURITY_CLAIM_ALLOW_CLAIMING_BY_DEFAULT:true}"
# Time allowed to claim the device in milliseconds
duration: "${SECURITY_CLAIM_DURATION:60000}" # 1 minute, note this value must equal claimDevices.timeToLiveInMinutes value
# Dashboard parameters
dashboard:
@ -261,6 +266,9 @@ caffeine:
entityViews:
timeToLiveInMinutes: 1440
maxSize: 100000
claimDevices:
timeToLiveInMinutes: 1
maxSize: 100000
redis:
# standalone or cluster

1
common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java

@ -22,4 +22,5 @@ public class CacheConstants {
public static final String SESSIONS_CACHE = "sessions";
public static final String ASSET_CACHE = "assets";
public static final String ENTITY_VIEW_CACHE = "entityViews";
public static final String CLAIM_DEVICES_CACHE = "claimDevices";
}

4
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -59,4 +59,8 @@ public class DataConstants {
public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
public static final String DEFAULT_SECRET_KEY = "";
public static final String SECRET_KEY_FIELD_NAME = "secretKey";
public static final String DURATION_MS_FIELD_NAME = "durationMs";
}

2
common/message/src/main/java/org/thingsboard/server/common/msg/session/FeatureType.java

@ -16,5 +16,5 @@
package org.thingsboard.server.common.msg.session;
public enum FeatureType {
ATTRIBUTES, TELEMETRY, RPC
ATTRIBUTES, TELEMETRY, RPC, CLAIM
}

4
common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionMsgType.java

@ -28,7 +28,9 @@ public enum SessionMsgType {
RULE_ENGINE_ERROR,
SESSION_OPEN, SESSION_CLOSE;
SESSION_OPEN, SESSION_CLOSE,
CLAIM_REQUEST();
private final boolean requiresRulesProcessing;

11
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java

@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.coap.CoAP.ResponseCode;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.Exchange;
import org.eclipse.californium.core.network.ExchangeObserver;
import org.eclipse.californium.core.server.resources.CoapExchange;
@ -122,6 +121,9 @@ public class CoapTransportResource extends CoapResource {
processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST);
}
break;
case CLAIM:
processRequest(exchange, SessionMsgType.CLAIM_REQUEST);
break;
}
}
}
@ -153,6 +155,11 @@ public class CoapTransportResource extends CoapResource {
transportContext.getAdaptor().convertToPostTelemetry(sessionId, request),
new CoapOkCallback(exchange));
break;
case CLAIM_REQUEST:
transportService.process(sessionInfo,
transportContext.getAdaptor().convertToClaimDevice(sessionId, request, sessionInfo),
new CoapOkCallback(exchange));
break;
case SUBSCRIBE_ATTRIBUTES_REQUEST:
advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
@ -319,7 +326,7 @@ public class CoapTransportResource extends CoapResource {
@Override
public void onSuccess(Void msg) {
exchange.respond(ResponseCode.VALID);
exchange.respond(ResponseCode.VALID);
}
@Override

3
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java

@ -22,7 +22,6 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.coap.CoapTransportResource;
import java.util.UUID;
import java.util.Optional;
public interface CoapTransportAdaptor {
@ -36,6 +35,8 @@ public interface CoapTransportAdaptor {
TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(UUID sessionId, Request inbound) throws AdaptorException;
TransportProtos.ClaimDeviceMsg convertToClaimDevice(UUID sessionId, Request inbound, TransportProtos.SessionInfoProto sessionInfo) throws AdaptorException;
Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;

44
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java

@ -15,33 +15,36 @@
*/
package org.thingsboard.server.transport.coap.adaptors;
import java.util.*;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.springframework.stereotype.Component;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.coap.CoapTransportResource;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@Component("JsonCoapAdaptor")
@Slf4j
public class JsonCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(UUID sessionId, Request inbound) throws AdaptorException {
String payload = validatePayload(sessionId, inbound);
String payload = validatePayload(sessionId, inbound, false);
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
@ -51,7 +54,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.PostAttributeMsg convertToPostAttributes(UUID sessionId, Request inbound) throws AdaptorException {
String payload = validatePayload(sessionId, inbound);
String payload = validatePayload(sessionId, inbound, false);
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
@ -79,7 +82,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(UUID sessionId, Request inbound) throws AdaptorException {
Optional<Integer> requestId = CoapTransportResource.getRequestId(inbound);
String payload = validatePayload(sessionId, inbound);
String payload = validatePayload(sessionId, inbound, false);
JsonObject response = new JsonParser().parse(payload).getAsJsonObject();
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId.orElseThrow(() -> new AdaptorException("Request id is missing!")))
.setPayload(response.toString()).build();
@ -87,10 +90,21 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(UUID sessionId, Request inbound) throws AdaptorException {
String payload = validatePayload(sessionId, inbound);
String payload = validatePayload(sessionId, inbound, false);
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
}
@Override
public TransportProtos.ClaimDeviceMsg convertToClaimDevice(UUID sessionId, Request inbound, TransportProtos.SessionInfoProto sessionInfo) throws AdaptorException {
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
String payload = validatePayload(sessionId, inbound, true);
try {
return JsonConverter.convertToClaimDeviceProto(deviceId, payload);
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
@Override
public Response convertToPublish(CoapTransportResource.CoapSessionListener session, TransportProtos.AttributeUpdateNotificationMsg msg) throws AdaptorException {
return getObserveNotification(session.getNextSeqNumber(), JsonConverter.toJson(msg));
@ -128,11 +142,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
return response;
}
private String validatePayload(UUID sessionId, Request inbound) throws AdaptorException {
private String validatePayload(UUID sessionId, Request inbound, boolean isEmptyPayloadAllowed) throws AdaptorException {
String payload = inbound.getPayloadString();
if (payload == null) {
log.warn("[{}] Payload is empty!", sessionId);
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
if (!isEmptyPayloadAllowed) {
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
}
}
return payload;
}

16
common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java

@ -20,7 +20,6 @@ import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
@ -31,6 +30,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
@ -119,6 +119,20 @@ public class DeviceApiController {
return responseWriter;
}
@RequestMapping(value = "/{deviceToken}/claim", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> claimDevice(@PathVariable("deviceToken") String deviceToken,
@RequestBody(required = false) String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
transportService.process(sessionInfo, JsonConverter.convertToClaimDeviceProto(deviceId, json),
new HttpOkCallback(responseWriter));
}));
return responseWriter;
}
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,

2
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java

@ -29,6 +29,7 @@ public class MqttTopics {
public static final String DEVICE_ATTRIBUTES_RESPONSES_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + "+";
public static final String DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC + "/attributes/request/";
public static final String DEVICE_TELEMETRY_TOPIC = BASE_DEVICE_API_TOPIC + "/telemetry";
public static final String DEVICE_CLAIM_TOPIC = BASE_DEVICE_API_TOPIC + "/claim";
public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes";
public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway";
@ -36,6 +37,7 @@ public class MqttTopics {
public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/disconnect";
public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes";
public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry";
public static final String GATEWAY_CLAIM_TOPIC = BASE_GATEWAY_API_TOPIC + "/claim";
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc";
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request";
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response";

8
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -38,11 +38,11 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.transport.service.AbstractTransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
@ -183,6 +183,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_TELEMETRY_TOPIC:
gatewaySessionHandler.onDeviceTelemetry(mqttMsg);
break;
case MqttTopics.GATEWAY_CLAIM_TOPIC:
gatewaySessionHandler.onDeviceClaim(mqttMsg);
break;
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
gatewaySessionHandler.onDeviceAttributes(mqttMsg);
break;
@ -221,6 +224,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);

24
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java

@ -57,7 +57,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
@Override
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
try {
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
@ -67,7 +67,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
@Override
public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
try {
return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
@ -114,7 +114,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
try {
Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
@ -123,6 +123,16 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
@Override
public TransportProtos.ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload(), true);
try {
return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload);
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
@ -193,7 +203,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
String payload = validatePayload(sessionId, payloadData);
String payload = validatePayload(sessionId, payloadData, false);
try {
return new JsonParser().parse(payload);
} catch (JsonSyntaxException ex) {
@ -201,12 +211,14 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
private static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
private static String validatePayload(UUID sessionId, ByteBuf payloadData, boolean isEmptyPayloadAllowed) throws AdaptorException {
try {
String payload = payloadData.toString(UTF8);
if (payload == null) {
log.warn("[{}] Payload is empty!", sessionId);
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
if (!isEmptyPayloadAllowed) {
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
}
}
return payload;
} finally {

4
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java

@ -19,6 +19,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
@ -46,6 +47,8 @@ public interface MqttTransportAdaptor {
ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
@ -59,4 +62,5 @@ public interface MqttTransportAdaptor {
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException;
}

39
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java

@ -30,6 +30,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@ -183,7 +184,42 @@ public class GatewaySessionHandler {
@Override
public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t);
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
public void onDeviceClaim(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
int msgId = mqttMsg.variableHeader().packetId();
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = deviceEntry.getKey();
Futures.addCallback(checkDeviceConnected(deviceName),
new FutureCallback<GatewayDeviceSessionCtx>() {
@Override
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
try {
DeviceId deviceId = deviceCtx.getDeviceId();
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue());
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
} catch (Throwable e) {
UUID gatewayId = new UUID(gateway.getDeviceIdMSB(), gateway.getDeviceIdLSB());
log.warn("[{}][{}] Failed to convert claim message: {}", gatewayId, deviceName, deviceEntry.getValue(), e);
}
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device claiming command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
}
@ -209,6 +245,7 @@ public class GatewaySessionHandler {
TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject());
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);

15
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -16,18 +16,19 @@
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
/**
* Created by ashvayka on 04.10.18.
@ -63,6 +64,8 @@ public interface TransportService {
void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);

93
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java

@ -24,6 +24,8 @@ import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
@ -63,23 +66,69 @@ public class JsonConverter {
private static int maxStringValueLength = 0;
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonObject) throws JsonSyntaxException {
long systemTs = System.currentTimeMillis();
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException {
PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
if (jsonObject.isJsonObject()) {
parseObject(builder, systemTs, jsonObject);
} else if (jsonObject.isJsonArray()) {
jsonObject.getAsJsonArray().forEach(je -> {
convertToTelemetry(jsonElement, System.currentTimeMillis(), null, builder);
return builder.build();
}
private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder) {
if (jsonElement.isJsonObject()) {
parseObject(systemTs, result, builder, jsonElement.getAsJsonObject());
} else if (jsonElement.isJsonArray()) {
jsonElement.getAsJsonArray().forEach(je -> {
if (je.isJsonObject()) {
parseObject(builder, systemTs, je.getAsJsonObject());
parseObject(systemTs, result, builder, je.getAsJsonObject());
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
}
});
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonElement);
}
return builder.build();
}
private static void parseObject(long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder, JsonObject jo) {
if (result != null) {
parseObject(result, systemTs, jo);
} else {
parseObject(builder, systemTs, jo);
}
}
public static ClaimDeviceMsg convertToClaimDeviceProto(DeviceId deviceId, String json) {
long durationMs = 0L;
if (json != null && !json.isEmpty()) {
return convertToClaimDeviceProto(deviceId, new JsonParser().parse(json));
}
return buildClaimDeviceMsg(deviceId, DataConstants.DEFAULT_SECRET_KEY, durationMs);
}
public static ClaimDeviceMsg convertToClaimDeviceProto(DeviceId deviceId, JsonElement jsonElement) {
String secretKey = DataConstants.DEFAULT_SECRET_KEY;
long durationMs = 0L;
if (jsonElement.isJsonObject()) {
JsonObject jo = jsonElement.getAsJsonObject();
if (jo.has(DataConstants.SECRET_KEY_FIELD_NAME)) {
secretKey = jo.get(DataConstants.SECRET_KEY_FIELD_NAME).getAsString();
}
if (jo.has(DataConstants.DURATION_MS_FIELD_NAME)) {
durationMs = jo.get(DataConstants.DURATION_MS_FIELD_NAME).getAsLong();
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonElement);
}
return buildClaimDeviceMsg(deviceId, secretKey, durationMs);
}
private static ClaimDeviceMsg buildClaimDeviceMsg(DeviceId deviceId, String secretKey, long durationMs) {
ClaimDeviceMsg.Builder result = ClaimDeviceMsg.newBuilder();
return result
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setSecretKey(secretKey)
.setDurationMs(durationMs)
.build();
}
public static PostAttributeMsg convertToAttributesProto(JsonElement jsonObject) throws JsonSyntaxException {
@ -103,8 +152,7 @@ public class JsonConverter {
return result;
}
private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
JsonObject jo = jsonObject.getAsJsonObject();
private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonObject jo) {
if (jo.has("ts") && jo.has("values")) {
parseWithTs(builder, jo);
} else {
@ -137,7 +185,7 @@ public class JsonConverter {
String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength);
throw new JsonSyntaxException(message);
}
if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
if (isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
try {
result.add(buildNumericKeyValueProto(value, valueEntry.getKey()));
} catch (RuntimeException th) {
@ -400,7 +448,7 @@ public class JsonConverter {
String message = String.format("String value length [%d] for key [%s] is greater than maximum allowed [%d]", value.getAsString().length(), valueEntry.getKey(), maxStringValueLength);
throw new JsonSyntaxException(message);
}
if(isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
if (isTypeCastEnabled && NumberUtils.isParsable(value.getAsString())) {
try {
parseNumericValue(result, valueEntry, value);
} catch (RuntimeException th) {
@ -423,26 +471,13 @@ public class JsonConverter {
return result;
}
public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonObject, long systemTs) throws JsonSyntaxException {
public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException {
Map<Long, List<KvEntry>> result = new HashMap<>();
if (jsonObject.isJsonObject()) {
parseObject(result, systemTs, jsonObject);
} else if (jsonObject.isJsonArray()) {
jsonObject.getAsJsonArray().forEach(je -> {
if (je.isJsonObject()) {
parseObject(result, systemTs, je.getAsJsonObject());
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
}
});
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonObject);
}
convertToTelemetry(jsonElement, systemTs, result, null);
return result;
}
private static void parseObject(Map<Long, List<KvEntry>> result, long systemTs, JsonElement jsonObject) {
JsonObject jo = jsonObject.getAsJsonObject();
private static void parseObject(Map<Long, List<KvEntry>> result, long systemTs, JsonObject jo) {
if (jo.has("ts") && jo.has("values")) {
parseWithTs(result, jo);
} else {

15
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java

@ -28,7 +28,12 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by ashvayka on 17.10.18.
@ -127,6 +132,12 @@ public abstract class AbstractTransportService implements TransportService {
}
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg,
TransportServiceCallback<Void> callback) {
registerClaimingInfo(sessionInfo, msg, callback);
}
@Override
public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
reportActivityInternal(sessionInfo);
@ -148,6 +159,8 @@ public abstract class AbstractTransportService implements TransportService {
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
protected abstract void registerClaimingInfo(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
UUID sessionId = toId(sessionInfo);
SessionMetaData sessionMetaData = sessions.get(sessionId);

35
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java

@ -24,34 +24,40 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.kafka.*;
import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -305,6 +311,15 @@ public class RemoteTransportService extends AbstractTransportService {
send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
protected void registerClaimingInfo(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setClaimDevice(msg).build()
).build();
send(sessionInfo, toRuleEngineMsg, callback);
}
private static class TransportCallbackAdaptor implements Callback {
private final TransportServiceCallback<Void> callback;

8
common/transport/transport-api/src/main/proto/transport.proto

@ -172,6 +172,13 @@ message ToServerRpcResponseMsg {
string error = 3;
}
message ClaimDeviceMsg {
int64 deviceIdMSB = 1;
int64 deviceIdLSB = 2;
string secretKey = 3;
int64 durationMs = 4;
}
//Used to report session state to tb-node and persist this state in the cache on the tb-node level.
message SubscriptionInfoProto {
int64 lastActivityTime = 1;
@ -199,6 +206,7 @@ message TransportToDeviceActorMsg {
ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
ToServerRpcRequestMsg toServerRPCCallRequest = 9;
SubscriptionInfoProto subscriptionInfo = 10;
ClaimDeviceMsg claimDevice = 11;
}
message DeviceActorToTransportMsg {

35
dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesService.java

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.device;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.device.claim.ClaimResponse;
import java.util.List;
public interface ClaimDevicesService {
ListenableFuture<Void> registerClaimingInfo(TenantId tenantId, DeviceId deviceId, String secretKey, long durationMs);
ListenableFuture<ClaimResponse> claimDevice(Device device, CustomerId customerId, String secretKey);
ListenableFuture<List<Void>> reClaimDevice(TenantId tenantId, Device device);
}

171
dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java

@ -0,0 +1,171 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.device;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.claim.ClaimData;
import org.thingsboard.server.dao.device.claim.ClaimResponse;
import org.thingsboard.server.dao.model.ModelConstants;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.thingsboard.server.common.data.CacheConstants.CLAIM_DEVICES_CACHE;
@Service
@Slf4j
public class ClaimDevicesServiceImpl implements ClaimDevicesService {
private static final String CLAIM_ATTRIBUTE_NAME = "claimingAllowed";
@Autowired
private DeviceService deviceService;
@Autowired
private AttributesService attributesService;
@Autowired
private CacheManager cacheManager;
@Value("${security.claim.allowClaimingByDefault}")
private boolean isAllowedClaimingByDefault;
@Value("${security.claim.duration}")
private long systemDurationMs;
@Override
public ListenableFuture<Void> registerClaimingInfo(TenantId tenantId, DeviceId deviceId, String secretKey, long durationMs) {
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
return Futures.transformAsync(deviceFuture, device -> {
Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE);
List<Object> key = constructCacheKey(device.getId());
if (isAllowedClaimingByDefault) {
if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
persistInCache(secretKey, durationMs, cache, key);
return Futures.immediateFuture(null);
}
log.warn("The device [{}] has been already claimed!", device.getName());
throw new IllegalArgumentException();
} else {
ListenableFuture<List<AttributeKvEntry>> claimingAllowedFuture = attributesService.find(tenantId, device.getId(),
DataConstants.SERVER_SCOPE, Collections.singletonList(CLAIM_ATTRIBUTE_NAME));
return Futures.transform(claimingAllowedFuture, list -> {
if (list != null && !list.isEmpty()) {
Optional<Boolean> claimingAllowedOptional = list.get(0).getBooleanValue();
if (claimingAllowedOptional.isPresent() && claimingAllowedOptional.get()
&& device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
persistInCache(secretKey, durationMs, cache, key);
return null;
}
}
log.warn("Failed to find claimingAllowed attribute for device or it is already claimed![{}]", device.getName());
throw new IllegalArgumentException();
});
}
});
}
@Override
public ListenableFuture<ClaimResponse> claimDevice(Device device, CustomerId customerId, String secretKey) {
List<Object> key = constructCacheKey(device.getId());
Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE);
ClaimData claimData = cache.get(key, ClaimData.class);
if (claimData != null) {
long currTs = System.currentTimeMillis();
if (currTs > claimData.getExpirationTime() || !secretKey.equals(claimData.getSecretKey())) {
log.warn("The claiming timeout occurred or wrong 'secretKey' provided for the device [{}]", device.getName());
cache.evict(key);
return Futures.immediateFuture(ClaimResponse.FAILURE);
} else {
if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
device.setCustomerId(customerId);
deviceService.saveDevice(device);
return Futures.transform(removeClaimingSavedData(cache, key, device), result -> ClaimResponse.SUCCESS);
}
return Futures.transform(removeClaimingSavedData(cache, key, device), result -> ClaimResponse.CLAIMED);
}
} else {
log.warn("Failed to find the device's claiming message![{}]", device.getName());
return Futures.immediateFuture(ClaimResponse.CLAIMED);
}
}
@Override
public ListenableFuture<List<Void>> reClaimDevice(TenantId tenantId, Device device) {
if (!device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
cacheEviction(device.getId());
device.setCustomerId(null);
deviceService.saveDevice(device);
if (isAllowedClaimingByDefault) {
return Futures.immediateFuture(Collections.emptyList());
}
return attributesService.save(tenantId, device.getId(), DataConstants.SERVER_SCOPE, Collections.singletonList(
new BaseAttributeKvEntry(new BooleanDataEntry(CLAIM_ATTRIBUTE_NAME, true),
System.currentTimeMillis())));
}
cacheEviction(device.getId());
return Futures.immediateFuture(Collections.emptyList());
}
private List<Object> constructCacheKey(DeviceId deviceId) {
return Collections.singletonList(deviceId);
}
private void persistInCache(String secretKey, long durationMs, Cache cache, List<Object> key) {
ClaimData claimData = new ClaimData(secretKey,
System.currentTimeMillis() + validateDurationMs(durationMs));
cache.putIfAbsent(key, claimData);
}
private long validateDurationMs(long durationMs) {
if (durationMs > 0L) {
return durationMs;
}
return systemDurationMs;
}
private ListenableFuture<List<Void>> removeClaimingSavedData(Cache cache, List<Object> key, Device device) {
cache.evict(key);
if (isAllowedClaimingByDefault) {
return Futures.immediateFuture(null);
}
return attributesService.removeAll(device.getTenantId(),
device.getId(), DataConstants.SERVER_SCOPE, Collections.singletonList(CLAIM_ATTRIBUTE_NAME));
}
private void cacheEviction(DeviceId deviceId) {
Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE);
cache.evict(constructCacheKey(deviceId));
}
}

28
dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimData.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.device.claim;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor
@Data
public class ClaimData {
private final String secretKey;
private final long expirationTime;
}

24
dao/src/main/java/org/thingsboard/server/dao/device/claim/ClaimResponse.java

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.device.claim;
public enum ClaimResponse {
SUCCESS,
FAILURE,
CLAIMED
}

7
dao/src/test/resources/application-test.properties

@ -27,11 +27,16 @@ caffeine.specs.assets.maxSize=100000
caffeine.specs.entityViews.timeToLiveInMinutes=1440
caffeine.specs.entityViews.maxSize=100000
caffeine.specs.claimDevices.timeToLiveInMinutes=1440
caffeine.specs.claimDevices.maxSize=100000
redis.connection.host=localhost
redis.connection.port=6379
redis.connection.db=0
redis.connection.password=
security.user_login_case_sensitive=true
security.claim.allowClaimingByDefault=true
security.claim.duration=60000
database.ts_max_intervals=700
database.ts_max_intervals=700

Loading…
Cancel
Save