|
|
|
@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.id.UUIDBased; |
|
|
|
import org.thingsboard.server.common.data.kv.Aggregation; |
|
|
|
import org.thingsboard.server.common.data.kv.AttributeKey; |
|
|
|
@ -127,7 +128,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr |
|
|
|
, @PathVariable("scope") String scope) throws ThingsboardException { |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, |
|
|
|
(result, entityId) -> getAttributeKeysCallback(result, entityId, scope)); |
|
|
|
(result, tenantId, entityId) -> getAttributeKeysCallback(result, tenantId, entityId, scope)); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -138,7 +139,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { |
|
|
|
SecurityUser user = getCurrentUser(); |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, |
|
|
|
(result, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr)); |
|
|
|
(result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr)); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -150,7 +151,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { |
|
|
|
SecurityUser user = getCurrentUser(); |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, |
|
|
|
(result, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr)); |
|
|
|
(result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr)); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -159,9 +160,7 @@ public class TelemetryController extends BaseController { |
|
|
|
public DeferredResult<ResponseEntity> getTimeseriesKeys( |
|
|
|
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, |
|
|
|
(result, entityId) -> { |
|
|
|
Futures.addCallback(tsService.findAllLatest(entityId), getTsKeysToResponseCallback(result)); |
|
|
|
}); |
|
|
|
(result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result))); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -173,7 +172,7 @@ public class TelemetryController extends BaseController { |
|
|
|
SecurityUser user = getCurrentUser(); |
|
|
|
|
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, |
|
|
|
(result, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr)); |
|
|
|
(result, tenantId, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -190,13 +189,13 @@ public class TelemetryController extends BaseController { |
|
|
|
@RequestParam(name = "agg", defaultValue = "NONE") String aggStr |
|
|
|
) throws ThingsboardException { |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityType, entityIdStr, |
|
|
|
(result, entityId) -> { |
|
|
|
(result, tenantId, entityId) -> { |
|
|
|
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
|
|
|
|
Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr); |
|
|
|
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg)) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
|
|
|
Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result)); |
|
|
|
Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result)); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
@ -206,7 +205,7 @@ public class TelemetryController extends BaseController { |
|
|
|
public DeferredResult<ResponseEntity> saveDeviceAttributes(@PathVariable("deviceId") String deviceIdStr, @PathVariable("scope") String scope, |
|
|
|
@RequestBody JsonNode request) throws ThingsboardException { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndUuid(EntityType.DEVICE, deviceIdStr); |
|
|
|
return saveAttributes(entityId, scope, request); |
|
|
|
return saveAttributes(getTenantId(), entityId, scope, request); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -216,7 +215,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@PathVariable("scope") String scope, |
|
|
|
@RequestBody JsonNode request) throws ThingsboardException { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); |
|
|
|
return saveAttributes(entityId, scope, request); |
|
|
|
return saveAttributes(getTenantId(), entityId, scope, request); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -226,7 +225,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@PathVariable("scope") String scope, |
|
|
|
@RequestBody JsonNode request) throws ThingsboardException { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); |
|
|
|
return saveAttributes(entityId, scope, request); |
|
|
|
return saveAttributes(getTenantId(), entityId, scope, request); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -236,7 +235,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@PathVariable("scope") String scope, |
|
|
|
@RequestBody String requestBody) throws ThingsboardException { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); |
|
|
|
return saveTelemetry(entityId, requestBody, 0L); |
|
|
|
return saveTelemetry(getTenantId(), entityId, requestBody, 0L); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -246,7 +245,7 @@ public class TelemetryController extends BaseController { |
|
|
|
@PathVariable("scope") String scope, @PathVariable("ttl") Long ttl, |
|
|
|
@RequestBody String requestBody) throws ThingsboardException { |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); |
|
|
|
return saveTelemetry(entityId, requestBody, ttl); |
|
|
|
return saveTelemetry(getTenantId(), entityId, requestBody, ttl); |
|
|
|
} |
|
|
|
|
|
|
|
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
|
|
|
@ -280,13 +279,13 @@ public class TelemetryController extends BaseController { |
|
|
|
deleteToTs = endTs; |
|
|
|
} |
|
|
|
|
|
|
|
return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, entityId) -> { |
|
|
|
return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, tenantId, entityId) -> { |
|
|
|
List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>(); |
|
|
|
for (String key : keys) { |
|
|
|
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); |
|
|
|
} |
|
|
|
|
|
|
|
ListenableFuture<List<Void>> future = tsService.remove(entityId, deleteTsKvQueries); |
|
|
|
ListenableFuture<List<Void>> future = tsService.remove(user.getTenantId(), entityId, deleteTsKvQueries); |
|
|
|
Futures.addCallback(future, new FutureCallback<List<Void>>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable List<Void> tmp) { |
|
|
|
@ -333,8 +332,8 @@ public class TelemetryController extends BaseController { |
|
|
|
if (DataConstants.SERVER_SCOPE.equals(scope) || |
|
|
|
DataConstants.SHARED_SCOPE.equals(scope) || |
|
|
|
DataConstants.CLIENT_SCOPE.equals(scope)) { |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdStr, (result, entityId) -> { |
|
|
|
ListenableFuture<List<Void>> future = attributesService.removeAll(entityId, scope, keys); |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdStr, (result, tenantId, entityId) -> { |
|
|
|
ListenableFuture<List<Void>> future = attributesService.removeAll(user.getTenantId(), entityId, scope, keys); |
|
|
|
Futures.addCallback(future, new FutureCallback<List<Void>>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable List<Void> tmp) { |
|
|
|
@ -362,7 +361,7 @@ public class TelemetryController extends BaseController { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private DeferredResult<ResponseEntity> saveAttributes(EntityId entityIdSrc, String scope, JsonNode json) throws ThingsboardException { |
|
|
|
private DeferredResult<ResponseEntity> saveAttributes(TenantId srcTenantId, EntityId entityIdSrc, String scope, JsonNode json) throws ThingsboardException { |
|
|
|
if (!DataConstants.SERVER_SCOPE.equals(scope) && !DataConstants.SHARED_SCOPE.equals(scope)) { |
|
|
|
return getImmediateDeferredResult("Invalid scope: " + scope, HttpStatus.BAD_REQUEST); |
|
|
|
} |
|
|
|
@ -372,8 +371,8 @@ public class TelemetryController extends BaseController { |
|
|
|
return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST); |
|
|
|
} |
|
|
|
SecurityUser user = getCurrentUser(); |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> { |
|
|
|
tsSubService.saveAndNotify(entityId, scope, attributes, new FutureCallback<Void>() { |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, tenantId, entityId) -> { |
|
|
|
tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<Void>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable Void tmp) { |
|
|
|
logAttributesUpdated(user, entityId, scope, attributes, null); |
|
|
|
@ -398,7 +397,7 @@ public class TelemetryController extends BaseController { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private DeferredResult<ResponseEntity> saveTelemetry(EntityId entityIdSrc, String requestBody, long ttl) throws ThingsboardException { |
|
|
|
private DeferredResult<ResponseEntity> saveTelemetry(TenantId curTenantId, EntityId entityIdSrc, String requestBody, long ttl) throws ThingsboardException { |
|
|
|
Map<Long, List<KvEntry>> telemetryRequest; |
|
|
|
JsonElement telemetryJson; |
|
|
|
try { |
|
|
|
@ -421,8 +420,8 @@ public class TelemetryController extends BaseController { |
|
|
|
return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST); |
|
|
|
} |
|
|
|
SecurityUser user = getCurrentUser(); |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, entityId) -> { |
|
|
|
tsSubService.saveAndNotify(entityId, entries, ttl, new FutureCallback<Void>() { |
|
|
|
return accessValidator.validateEntityAndCallback(getCurrentUser(), entityIdSrc, (result, tenantId, entityId) -> { |
|
|
|
tsSubService.saveAndNotify(tenantId, entityId, entries, ttl, new FutureCallback<Void>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable Void tmp) { |
|
|
|
result.setResult(new ResponseEntity(HttpStatus.OK)); |
|
|
|
@ -439,9 +438,9 @@ public class TelemetryController extends BaseController { |
|
|
|
private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys) { |
|
|
|
ListenableFuture<List<TsKvEntry>> future; |
|
|
|
if (StringUtils.isEmpty(keys)) { |
|
|
|
future = tsService.findAllLatest(entityId); |
|
|
|
future = tsService.findAllLatest(user.getTenantId(), entityId); |
|
|
|
} else { |
|
|
|
future = tsService.findLatest(entityId, toKeysList(keys)); |
|
|
|
future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys)); |
|
|
|
} |
|
|
|
Futures.addCallback(future, getTsKvListCallback(result)); |
|
|
|
} |
|
|
|
@ -451,17 +450,17 @@ public class TelemetryController extends BaseController { |
|
|
|
FutureCallback<List<AttributeKvEntry>> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList); |
|
|
|
if (!StringUtils.isEmpty(scope)) { |
|
|
|
if (keyList != null && !keyList.isEmpty()) { |
|
|
|
Futures.addCallback(attributesService.find(entityId, scope, keyList), callback); |
|
|
|
Futures.addCallback(attributesService.find(user.getTenantId(), entityId, scope, keyList), callback); |
|
|
|
} else { |
|
|
|
Futures.addCallback(attributesService.findAll(entityId, scope), callback); |
|
|
|
Futures.addCallback(attributesService.findAll(user.getTenantId(), entityId, scope), callback); |
|
|
|
} |
|
|
|
} else { |
|
|
|
List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>(); |
|
|
|
for (String tmpScope : DataConstants.allScopes()) { |
|
|
|
if (keyList != null && !keyList.isEmpty()) { |
|
|
|
futures.add(attributesService.find(entityId, tmpScope, keyList)); |
|
|
|
futures.add(attributesService.find(user.getTenantId(), entityId, tmpScope, keyList)); |
|
|
|
} else { |
|
|
|
futures.add(attributesService.findAll(entityId, tmpScope)); |
|
|
|
futures.add(attributesService.findAll(user.getTenantId(), entityId, tmpScope)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -471,14 +470,14 @@ public class TelemetryController extends BaseController { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void getAttributeKeysCallback(@Nullable DeferredResult<ResponseEntity> result, EntityId entityId, String scope) { |
|
|
|
Futures.addCallback(attributesService.findAll(entityId, scope), getAttributeKeysToResponseCallback(result)); |
|
|
|
private void getAttributeKeysCallback(@Nullable DeferredResult<ResponseEntity> result, TenantId tenantId, EntityId entityId, String scope) { |
|
|
|
Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), getAttributeKeysToResponseCallback(result)); |
|
|
|
} |
|
|
|
|
|
|
|
private void getAttributeKeysCallback(@Nullable DeferredResult<ResponseEntity> result, EntityId entityId) { |
|
|
|
private void getAttributeKeysCallback(@Nullable DeferredResult<ResponseEntity> result, TenantId tenantId, EntityId entityId) { |
|
|
|
List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>(); |
|
|
|
for (String scope : DataConstants.allScopes()) { |
|
|
|
futures.add(attributesService.findAll(entityId, scope)); |
|
|
|
futures.add(attributesService.findAll(tenantId, entityId, scope)); |
|
|
|
} |
|
|
|
|
|
|
|
ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures); |
|
|
|
|