diff --git a/application/src/main/java/org/thingsboard/server/controller/AssetController.java b/application/src/main/java/org/thingsboard/server/controller/AssetController.java index 4854c96941..d63674b7ee 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AssetController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AssetController.java @@ -16,9 +16,12 @@ package org.thingsboard.server.controller; import com.google.common.util.concurrent.ListenableFuture; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -34,7 +37,6 @@ import org.thingsboard.server.common.data.asset.AssetInfo; import org.thingsboard.server.common.data.asset.AssetSearchQuery; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.exception.ThingsboardException; @@ -48,6 +50,9 @@ import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.asset.AssetBulkImportService; +import org.thingsboard.server.service.importing.BulkImportRequest; +import org.thingsboard.server.service.importing.BulkImportResult; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; @@ -63,7 +68,10 @@ import static org.thingsboard.server.controller.EdgeController.EDGE_ID; @RestController @TbCoreComponent @RequestMapping("/api") +@RequiredArgsConstructor +@Slf4j public class AssetController extends BaseController { + private final AssetBulkImportService assetBulkImportService; public static final String ASSET_ID = "assetId"; @@ -108,13 +116,7 @@ public class AssetController extends BaseController { Asset savedAsset = checkNotNull(assetService.saveAsset(asset)); - logEntityAction(savedAsset.getId(), savedAsset, - savedAsset.getCustomerId(), - asset.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null); - - if (asset.getId() != null) { - sendEntityNotificationMsg(savedAsset.getTenantId(), savedAsset.getId(), EdgeEventActionType.UPDATED); - } + onAssetCreatedOrUpdated(savedAsset, asset.getId() != null); return savedAsset; } catch (Exception e) { @@ -124,6 +126,20 @@ public class AssetController extends BaseController { } } + private void onAssetCreatedOrUpdated(Asset asset, boolean updated) { + try { + logEntityAction(asset.getId(), asset, + asset.getCustomerId(), + updated ? ActionType.UPDATED : ActionType.ADDED, null); + } catch (ThingsboardException e) { + log.error("Failed to log entity action", e); + } + + if (updated) { + sendEntityNotificationMsg(asset.getTenantId(), asset.getId(), EdgeEventActionType.UPDATED); + } + } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/asset/{assetId}", method = RequestMethod.DELETE) @ResponseStatus(value = HttpStatus.OK) @@ -258,7 +274,7 @@ public class AssetController extends BaseController { try { TenantId tenantId = getCurrentUser().getTenantId(); PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); - if (type != null && type.trim().length()>0) { + if (type != null && type.trim().length() > 0) { return checkNotNull(assetService.findAssetsByTenantIdAndType(tenantId, type, pageLink)); } else { return checkNotNull(assetService.findAssetsByTenantId(tenantId, pageLink)); @@ -321,7 +337,7 @@ public class AssetController extends BaseController { CustomerId customerId = new CustomerId(toUUID(strCustomerId)); checkCustomerId(customerId, Operation.READ); PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); - if (type != null && type.trim().length()>0) { + if (type != null && type.trim().length() > 0) { return checkNotNull(assetService.findAssetsByTenantIdAndCustomerIdAndType(tenantId, customerId, type, pageLink)); } else { return checkNotNull(assetService.findAssetsByTenantIdAndCustomerId(tenantId, customerId, pageLink)); @@ -426,7 +442,7 @@ public class AssetController extends BaseController { @RequestMapping(value = "/edge/{edgeId}/asset/{assetId}", method = RequestMethod.POST) @ResponseBody public Asset assignAssetToEdge(@PathVariable(EDGE_ID) String strEdgeId, - @PathVariable(ASSET_ID) String strAssetId) throws ThingsboardException { + @PathVariable(ASSET_ID) String strAssetId) throws ThingsboardException { checkParameter(EDGE_ID, strEdgeId); checkParameter(ASSET_ID, strAssetId); try { @@ -444,7 +460,7 @@ public class AssetController extends BaseController { sendEntityAssignToEdgeNotificationMsg(getTenantId(), edgeId, savedAsset.getId(), EdgeEventActionType.ASSIGNED_TO_EDGE); - return savedAsset; + return savedAsset; } catch (Exception e) { logEntityAction(emptyId(EntityType.ASSET), null, @@ -530,4 +546,13 @@ public class AssetController extends BaseController { throw handleException(e); } } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @PostMapping("/asset/bulk_import") + public BulkImportResult processAssetsBulkImport(@RequestBody BulkImportRequest request) throws Exception { + return assetBulkImportService.processBulkImport(request, getCurrentUser(), importedAssetInfo -> { + onAssetCreatedOrUpdated(importedAssetInfo.getEntity(), importedAssetInfo.isUpdated()); + }); + } + } diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 267cfe75e3..36e2bcc92a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -122,7 +122,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.action.RuleEngineEntityActionService; +import org.thingsboard.server.service.action.EntityActionService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.EdgeNotificationService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; @@ -277,7 +277,7 @@ public abstract class BaseController { protected EdgeRpcService edgeGrpcService; @Autowired - protected RuleEngineEntityActionService ruleEngineEntityActionService; + protected EntityActionService entityActionService; @Value("${server.log_controller_error_stack_trace}") @Getter @@ -817,13 +817,7 @@ public abstract class BaseController { protected void logEntityAction(User user, I entityId, E entity, CustomerId customerId, ActionType actionType, Exception e, Object... additionalInfo) throws ThingsboardException { - if (customerId == null || customerId.isNullUid()) { - customerId = user.getCustomerId(); - } - if (e == null) { - ruleEngineEntityActionService.pushEntityActionToRuleEngine(entityId, entity, user.getTenantId(), customerId, actionType, user, additionalInfo); - } - auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo); + entityActionService.logEntityAction(user, entityId, entity, customerId, actionType, e, additionalInfo); } diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index c261d50f23..904171976a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -19,10 +19,13 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -68,6 +71,9 @@ import org.thingsboard.server.dao.device.claim.ReclaimResult; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.device.DeviceBulkImportService; +import org.thingsboard.server.service.importing.BulkImportRequest; +import org.thingsboard.server.service.importing.BulkImportResult; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; @@ -84,7 +90,10 @@ import static org.thingsboard.server.controller.EdgeController.EDGE_ID; @RestController @TbCoreComponent @RequestMapping("/api") +@RequiredArgsConstructor +@Slf4j public class DeviceController extends BaseController { + private final DeviceBulkImportService deviceBulkImportService; private static final String DEVICE_ID = "deviceId"; private static final String DEVICE_NAME = "deviceName"; @@ -136,24 +145,7 @@ public class DeviceController extends BaseController { Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken)); - tbClusterService.onDeviceChange(savedDevice, null); - tbClusterService.pushMsgToCore(new DeviceNameOrTypeUpdateMsg(savedDevice.getTenantId(), - savedDevice.getId(), savedDevice.getName(), savedDevice.getType()), null); - tbClusterService.onEntityStateChange(savedDevice.getTenantId(), savedDevice.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); - - if (!created) { - sendEntityNotificationMsg(savedDevice.getTenantId(), savedDevice.getId(), EdgeEventActionType.UPDATED); - } - - logEntityAction(savedDevice.getId(), savedDevice, - savedDevice.getCustomerId(), - created ? ActionType.ADDED : ActionType.UPDATED, null); - - if (device.getId() == null) { - deviceStateService.onDeviceAdded(savedDevice); - } else { - deviceStateService.onDeviceUpdated(savedDevice); - } + onDeviceCreatedOrUpdated(savedDevice, !created); otaPackageStateService.update(savedDevice, oldDevice); @@ -166,6 +158,31 @@ public class DeviceController extends BaseController { } + private void onDeviceCreatedOrUpdated(Device device, boolean updated) { + tbClusterService.onDeviceChange(device, null); + tbClusterService.pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), + device.getId(), device.getName(), device.getType()), null); + tbClusterService.onEntityStateChange(device.getTenantId(), device.getId(), updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED); + + if (updated) { + sendEntityNotificationMsg(device.getTenantId(), device.getId(), EdgeEventActionType.UPDATED); + } + + try { + logEntityAction(device.getId(), device, + device.getCustomerId(), + updated ? ActionType.UPDATED : ActionType.ADDED, null); + } catch (ThingsboardException e) { + log.error("Failed to log entity action", e); + } + + if (updated) { + deviceStateService.onDeviceUpdated(device); + } else { + deviceStateService.onDeviceAdded(device); + } + } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/device/{deviceId}", method = RequestMethod.DELETE) @ResponseStatus(value = HttpStatus.OK) @@ -797,4 +814,14 @@ public class DeviceController extends BaseController { throw handleException(e); } } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @PostMapping("/device/bulk_import") + public BulkImportResult processDevicesBulkImport(@RequestBody BulkImportRequest request) throws Exception { + return deviceBulkImportService.processBulkImport(request, getCurrentUser(), importedDeviceInfo -> { + onDeviceCreatedOrUpdated(importedDeviceInfo.getEntity(), importedDeviceInfo.isUpdated()); + otaPackageStateService.update(importedDeviceInfo.getEntity(), importedDeviceInfo.getOldEntity()); + }); + } + } diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 426c899588..2b1b6611d7 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -16,9 +16,11 @@ package org.thingsboard.server.controller; import com.google.common.util.concurrent.ListenableFuture; +import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -49,10 +51,14 @@ import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.EdgeBulkImportService; +import org.thingsboard.server.service.importing.BulkImportRequest; +import org.thingsboard.server.service.importing.BulkImportResult; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -60,7 +66,9 @@ import java.util.stream.Collectors; @RestController @TbCoreComponent @RequestMapping("/api") +@RequiredArgsConstructor public class EdgeController extends BaseController { + private final EdgeBulkImportService edgeBulkImportService; public static final String EDGE_ID = "edgeId"; @@ -128,17 +136,8 @@ public class EdgeController extends BaseController { edge.getId(), edge); Edge savedEdge = checkNotNull(edgeService.saveEdge(edge, true)); + onEdgeCreatedOrUpdated(tenantId, savedEdge, edgeTemplateRootRuleChain, !created); - if (created) { - ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), savedEdge.getId()); - edgeNotificationService.setEdgeRootRuleChain(tenantId, savedEdge, edgeTemplateRootRuleChain.getId()); - edgeService.assignDefaultRuleChainsToEdge(tenantId, savedEdge.getId()); - } - - tbClusterService.onEntityStateChange(savedEdge.getTenantId(), savedEdge.getId(), - created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); - - logEntityAction(savedEdge.getId(), savedEdge, null, created ? ActionType.ADDED : ActionType.UPDATED, null); return savedEdge; } catch (Exception e) { logEntityAction(emptyId(EntityType.EDGE), edge, @@ -147,6 +146,19 @@ public class EdgeController extends BaseController { } } + private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated) throws IOException, ThingsboardException { + if (!updated) { + ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId()); + edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId()); + edgeService.assignDefaultRuleChainsToEdge(tenantId, edge.getId()); + } + + tbClusterService.onEntityStateChange(edge.getTenantId(), edge.getId(), + updated ? ComponentLifecycleEvent.UPDATED : ComponentLifecycleEvent.CREATED); + + logEntityAction(edge.getId(), edge, null, updated ? ActionType.UPDATED : ActionType.ADDED, null); + } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/edge/{edgeId}", method = RequestMethod.DELETE) @ResponseStatus(value = HttpStatus.OK) @@ -580,6 +592,24 @@ public class EdgeController extends BaseController { } } + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @PostMapping("/edge/bulk_import") + public BulkImportResult processEdgeBulkImport(@RequestBody BulkImportRequest request) throws Exception { + SecurityUser user = getCurrentUser(); + RuleChain edgeTemplateRootRuleChain = ruleChainService.getEdgeTemplateRootRuleChain(user.getTenantId()); + if (edgeTemplateRootRuleChain == null) { + throw new DataValidationException("Root edge rule chain is not available!"); + } + + return edgeBulkImportService.processBulkImport(request, user, importedAssetInfo -> { + try { + onEdgeCreatedOrUpdated(user.getTenantId(), importedAssetInfo.getEntity(), edgeTemplateRootRuleChain, importedAssetInfo.isUpdated()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + private void cleanUpLicenseKey(Edge edge) { edge.setEdgeLicenseKey(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java b/application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java similarity index 93% rename from application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java rename to application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java index f1320d1a7a..ce151ea8cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java +++ b/application/src/main/java/org/thingsboard/server/service/action/EntityActionService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -38,6 +39,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.audit.AuditLogService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.queue.TbClusterService; @@ -49,8 +51,9 @@ import java.util.stream.Collectors; @Service @RequiredArgsConstructor @Slf4j -public class RuleEngineEntityActionService { +public class EntityActionService { private final TbClusterService tbClusterService; + private final AuditLogService auditLogService; private static final ObjectMapper json = new ObjectMapper(); @@ -209,6 +212,17 @@ public class RuleEngineEntityActionService { } } + public void logEntityAction(User user, I entityId, E entity, CustomerId customerId, + ActionType actionType, Exception e, Object... additionalInfo) { + if (customerId == null || customerId.isNullUid()) { + customerId = user.getCustomerId(); + } + if (e == null) { + pushEntityActionToRuleEngine(entityId, entity, user.getTenantId(), customerId, actionType, user, additionalInfo); + } + auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo); + } + private T extractParameter(Class clazz, int index, Object... additionalInfo) { T result = null; diff --git a/application/src/main/java/org/thingsboard/server/service/asset/AssetBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/asset/AssetBulkImportService.java new file mode 100644 index 0000000000..e5a6c4f182 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/asset/AssetBulkImportService.java @@ -0,0 +1,94 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.asset; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.action.EntityActionService; +import org.thingsboard.server.service.importing.AbstractBulkImportService; +import org.thingsboard.server.service.importing.BulkImportRequest; +import org.thingsboard.server.service.importing.ImportedEntityInfo; +import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.AccessControlService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; + +import java.util.Map; +import java.util.Optional; + + +@Service +@TbCoreComponent +public class AssetBulkImportService extends AbstractBulkImportService { + private final AssetService assetService; + + public AssetBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, + AccessControlService accessControlService, AccessValidator accessValidator, + EntityActionService entityActionService, TbClusterService clusterService, AssetService assetService) { + super(tsSubscriptionService, tenantProfileCache, accessControlService, accessValidator, entityActionService, clusterService); + this.assetService = assetService; + } + + @Override + protected ImportedEntityInfo saveEntity(BulkImportRequest importRequest, Map entityData, SecurityUser user) { + ImportedEntityInfo importedEntityInfo = new ImportedEntityInfo<>(); + + Asset asset = new Asset(); + asset.setTenantId(user.getTenantId()); + setAssetFields(asset, entityData); + + Asset existingAsset = assetService.findAssetByTenantIdAndName(user.getTenantId(), asset.getName()); + if (existingAsset != null && importRequest.getMapping().getUpdate()) { + importedEntityInfo.setOldEntity(new Asset(existingAsset)); + importedEntityInfo.setUpdated(true); + existingAsset.update(asset); + asset = existingAsset; + } + asset = assetService.saveAsset(asset); + + importedEntityInfo.setEntity(asset); + return importedEntityInfo; + } + + private void setAssetFields(Asset asset, Map data) { + ObjectNode additionalInfo = (ObjectNode) Optional.ofNullable(asset.getAdditionalInfo()).orElseGet(JacksonUtil::newObjectNode); + data.forEach((columnMapping, value) -> { + switch (columnMapping.getType()) { + case NAME: + asset.setName(value); + break; + case TYPE: + asset.setType(value); + break; + case LABEL: + asset.setLabel(value); + break; + case DESCRIPTION: + additionalInfo.set("description", new TextNode(value)); + break; + } + }); + asset.setAdditionalInfo(additionalInfo); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java new file mode 100644 index 0000000000..5f876c18e2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java @@ -0,0 +1,242 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.device; + +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import lombok.SneakyThrows; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.DeviceProfileProvisionType; +import org.thingsboard.server.common.data.DeviceProfileType; +import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; +import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MClientCredentials; +import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; +import org.thingsboard.server.common.data.device.profile.DeviceProfileData; +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration; +import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.dao.device.DeviceCredentialsService; +import org.thingsboard.server.dao.device.DeviceProfileService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.action.EntityActionService; +import org.thingsboard.server.service.importing.AbstractBulkImportService; +import org.thingsboard.server.service.importing.BulkImportColumnType; +import org.thingsboard.server.service.importing.BulkImportRequest; +import org.thingsboard.server.service.importing.ImportedEntityInfo; +import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.AccessControlService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.thingsboard.server.dao.service.Validator.validateId; + +@Service +@TbCoreComponent +public class DeviceBulkImportService extends AbstractBulkImportService { + protected final DeviceService deviceService; + protected final DeviceCredentialsService deviceCredentialsService; + protected final DeviceProfileService deviceProfileService; + + public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, + AccessControlService accessControlService, AccessValidator accessValidator, + EntityActionService entityActionService, TbClusterService clusterService, + DeviceService deviceService, DeviceCredentialsService deviceCredentialsService, + DeviceProfileService deviceProfileService) { + super(tsSubscriptionService, tenantProfileCache, accessControlService, accessValidator, entityActionService, clusterService); + this.deviceService = deviceService; + this.deviceCredentialsService = deviceCredentialsService; + this.deviceProfileService = deviceProfileService; + } + + @Override + protected ImportedEntityInfo saveEntity(BulkImportRequest importRequest, Map entityData, SecurityUser user) { + ImportedEntityInfo importedEntityInfo = new ImportedEntityInfo<>(); + + Device device = new Device(); + device.setTenantId(user.getTenantId()); + setDeviceFields(device, entityData); + + Device existingDevice = deviceService.findDeviceByTenantIdAndName(user.getTenantId(), device.getName()); + if (existingDevice != null && importRequest.getMapping().getUpdate()) { + importedEntityInfo.setOldEntity(new Device(existingDevice)); + importedEntityInfo.setUpdated(true); + existingDevice.updateDevice(device); + device = existingDevice; + } + + DeviceCredentials deviceCredentials = createDeviceCredentials(entityData); + if (deviceCredentials.getCredentialsType() != null) { + if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { + setUpLwM2mDeviceProfile(user.getTenantId(), device); + } + device = deviceService.saveDeviceWithCredentials(device, deviceCredentials); + } else { + device = deviceService.saveDevice(device); + } + + importedEntityInfo.setEntity(device); + + return importedEntityInfo; + } + + private void setDeviceFields(Device device, Map data) { + ObjectNode additionalInfo = (ObjectNode) Optional.ofNullable(device.getAdditionalInfo()).orElseGet(JacksonUtil::newObjectNode); + data.forEach((columnMapping, value) -> { + switch (columnMapping.getType()) { + case NAME: + device.setName(value); + break; + case TYPE: + device.setType(value); + break; + case LABEL: + device.setLabel(value); + break; + case DESCRIPTION: + additionalInfo.set("description", new TextNode(value)); + break; + case IS_GATEWAY: + additionalInfo.set("gateway", BooleanNode.valueOf(Boolean.parseBoolean(value))); + break; + } + device.setAdditionalInfo(additionalInfo); + }); + } + + @SneakyThrows + private DeviceCredentials createDeviceCredentials(Map data) { + Set columns = data.keySet().stream().map(BulkImportRequest.ColumnMapping::getType).collect(Collectors.toSet()); + + DeviceCredentials credentials = new DeviceCredentials(); + + if (columns.contains(BulkImportColumnType.ACCESS_TOKEN)) { + credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + credentials.setCredentialsId(getByColumnType(BulkImportColumnType.ACCESS_TOKEN, data)); + } else if (CollectionUtils.containsAny(columns, EnumSet.of(BulkImportColumnType.MQTT_CLIENT_ID, BulkImportColumnType.MQTT_USER_NAME, BulkImportColumnType.MQTT_PASSWORD))) { + credentials.setCredentialsType(DeviceCredentialsType.MQTT_BASIC); + + BasicMqttCredentials basicMqttCredentials = new BasicMqttCredentials(); + basicMqttCredentials.setClientId(getByColumnType(BulkImportColumnType.MQTT_CLIENT_ID, data)); + basicMqttCredentials.setUserName(getByColumnType(BulkImportColumnType.MQTT_USER_NAME, data)); + basicMqttCredentials.setPassword(getByColumnType(BulkImportColumnType.MQTT_PASSWORD, data)); + credentials.setCredentialsValue(JacksonUtil.toString(basicMqttCredentials)); + } else if (columns.contains(BulkImportColumnType.X509)) { + credentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE); + credentials.setCredentialsValue(getByColumnType(BulkImportColumnType.X509, data)); + } else if (columns.contains(BulkImportColumnType.LWM2M_CLIENT_ENDPOINT)) { + credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS); + ObjectNode lwm2mCredentials = JacksonUtil.newObjectNode(); + + ObjectNode client = JacksonUtil.newObjectNode(); + Stream.of(BulkImportColumnType.LWM2M_CLIENT_ENDPOINT, BulkImportColumnType.LWM2M_CLIENT_SECURITY_CONFIG_MODE, + BulkImportColumnType.LWM2M_CLIENT_IDENTITY, BulkImportColumnType.LWM2M_CLIENT_KEY, BulkImportColumnType.LWM2M_CLIENT_CERT) + .forEach(lwm2mClientProperty -> { + String value = getByColumnType(lwm2mClientProperty, data); + if (value != null) { + client.set(lwm2mClientProperty.getKey(), new TextNode(value)); + } + }); + + LwM2MClientCredentials lwM2MClientCredentials = JacksonUtil.treeToValue(client, LwM2MClientCredentials.class); + // so that only fields needed for specific type of lwM2MClientCredentials were saved in json + lwm2mCredentials.set("client", JacksonUtil.valueToTree(lwM2MClientCredentials)); + + ObjectNode bootstrapServer = JacksonUtil.newObjectNode(); + Stream.of(BulkImportColumnType.LWM2M_BOOTSTRAP_SERVER_SECURITY_MODE, BulkImportColumnType.LWM2M_BOOTSTRAP_SERVER_PUBLIC_KEY_OR_ID, + BulkImportColumnType.LWM2M_BOOTSTRAP_SERVER_SECRET_KEY) + .forEach(lwm2mBootstrapServerProperty -> { + String value = getByColumnType(lwm2mBootstrapServerProperty, data); + if (value != null) { + bootstrapServer.set(lwm2mBootstrapServerProperty.getKey(), new TextNode(value)); + } + }); + + ObjectNode lwm2mServer = JacksonUtil.newObjectNode(); + Stream.of(BulkImportColumnType.LWM2M_SERVER_SECURITY_MODE, BulkImportColumnType.LWM2M_SERVER_CLIENT_PUBLIC_KEY_OR_ID, + BulkImportColumnType.LWM2M_SERVER_CLIENT_SECRET_KEY) + .forEach(lwm2mServerProperty -> { + String value = getByColumnType(lwm2mServerProperty, data); + if (value != null) { + lwm2mServer.set(lwm2mServerProperty.getKey(), new TextNode(value)); + } + }); + + ObjectNode bootstrap = JacksonUtil.newObjectNode(); + bootstrap.set("bootstrapServer", bootstrapServer); + bootstrap.set("lwm2mServer", lwm2mServer); + lwm2mCredentials.set("bootstrap", bootstrap); + + credentials.setCredentialsValue(lwm2mCredentials.toString()); + } + + return credentials; + } + + private void setUpLwM2mDeviceProfile(TenantId tenantId, Device device) { + DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType()); + if (deviceProfile != null) { + if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) { + deviceProfile.setTransportType(DeviceTransportType.LWM2M); + deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration()); + deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); + device.setDeviceProfileId(deviceProfile.getId()); + } + } else { + deviceProfile = new DeviceProfile(); + deviceProfile.setTenantId(tenantId); + deviceProfile.setType(DeviceProfileType.DEFAULT); + deviceProfile.setName(device.getType()); + deviceProfile.setTransportType(DeviceTransportType.LWM2M); + deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); + + DeviceProfileData deviceProfileData = new DeviceProfileData(); + DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration(); + DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration(); + DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null); + + deviceProfileData.setConfiguration(configuration); + deviceProfileData.setTransportConfiguration(transportConfiguration); + deviceProfileData.setProvisionConfiguration(provisionConfiguration); + deviceProfile.setProfileData(deviceProfileData); + + deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); + device.setDeviceProfileId(deviceProfile.getId()); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeBulkImportService.java new file mode 100644 index 0000000000..ab22e7227c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeBulkImportService.java @@ -0,0 +1,105 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.action.EntityActionService; +import org.thingsboard.server.service.importing.AbstractBulkImportService; +import org.thingsboard.server.service.importing.BulkImportRequest; +import org.thingsboard.server.service.importing.ImportedEntityInfo; +import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.AccessControlService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; + +import java.util.Map; +import java.util.Optional; + +@Service +@TbCoreComponent +public class EdgeBulkImportService extends AbstractBulkImportService { + private final EdgeService edgeService; + + public EdgeBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, + AccessControlService accessControlService, AccessValidator accessValidator, + EntityActionService entityActionService, TbClusterService clusterService, EdgeService edgeService) { + super(tsSubscriptionService, tenantProfileCache, accessControlService, accessValidator, entityActionService, clusterService); + this.edgeService = edgeService; + } + + @Override + protected ImportedEntityInfo saveEntity(BulkImportRequest importRequest, Map entityData, SecurityUser user) { + ImportedEntityInfo importedEntityInfo = new ImportedEntityInfo<>(); + + Edge edge = new Edge(); + edge.setTenantId(user.getTenantId()); + setEdgeFields(edge, entityData); + + Edge existingEdge = edgeService.findEdgeByTenantIdAndName(user.getTenantId(), edge.getName()); + if (existingEdge != null && importRequest.getMapping().getUpdate()) { + importedEntityInfo.setOldEntity(new Edge(existingEdge)); + importedEntityInfo.setUpdated(true); + existingEdge.update(edge); + edge = existingEdge; + } + edge = edgeService.saveEdge(edge, true); + + importedEntityInfo.setEntity(edge); + return importedEntityInfo; + } + + private void setEdgeFields(Edge edge, Map data) { + ObjectNode additionalInfo = (ObjectNode) Optional.ofNullable(edge.getAdditionalInfo()).orElseGet(JacksonUtil::newObjectNode); + data.forEach((columnMapping, value) -> { + switch (columnMapping.getType()) { + case NAME: + edge.setName(value); + break; + case TYPE: + edge.setType(value); + break; + case LABEL: + edge.setLabel(value); + break; + case DESCRIPTION: + additionalInfo.set("description", new TextNode(value)); + break; + case EDGE_LICENSE_KEY: + edge.setEdgeLicenseKey(value); + break; + case CLOUD_ENDPOINT: + edge.setCloudEndpoint(value); + break; + case ROUTING_KEY: + edge.setRoutingKey(value); + break; + case SECRET: + edge.setSecret(value); + break; + } + }); + edge.setAdditionalInfo(additionalInfo); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java new file mode 100644 index 0000000000..2de18e98e5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/importing/AbstractBulkImportService.java @@ -0,0 +1,196 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.importing; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.BaseData; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.controller.BaseController; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.service.action.EntityActionService; +import org.thingsboard.server.service.importing.BulkImportRequest.ColumnMapping; +import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.AccessControlService; +import org.thingsboard.server.service.security.permission.Operation; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; +import org.thingsboard.server.utils.CsvUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@RequiredArgsConstructor +public abstract class AbstractBulkImportService> { + protected final TelemetrySubscriptionService tsSubscriptionService; + protected final TbTenantProfileCache tenantProfileCache; + protected final AccessControlService accessControlService; + protected final AccessValidator accessValidator; + protected final EntityActionService entityActionService; + protected final TbClusterService clusterService; + + public final BulkImportResult processBulkImport(BulkImportRequest request, SecurityUser user, Consumer> onEntityImported) throws Exception { + BulkImportResult result = new BulkImportResult<>(); + + AtomicInteger i = new AtomicInteger(0); + if (request.getMapping().getHeader()) { + i.incrementAndGet(); + } + + parseData(request).forEach(entityData -> { + i.incrementAndGet(); + try { + ImportedEntityInfo importedEntityInfo = saveEntity(request, entityData, user); + onEntityImported.accept(importedEntityInfo); + + E entity = importedEntityInfo.getEntity(); + + saveKvs(user, entity, entityData); + + if (importedEntityInfo.isUpdated()) { + result.setUpdated(result.getUpdated() + 1); + } else { + result.setCreated(result.getCreated() + 1); + } + } catch (Exception e) { + result.setErrors(result.getErrors() + 1); + result.getErrorsList().add(String.format("Line %d: %s", i.get(), e.getMessage())); + } + }); + + return result; + } + + protected abstract ImportedEntityInfo saveEntity(BulkImportRequest importRequest, Map entityData, SecurityUser user); + + /* + * Attributes' values are firstly added to JsonObject in order to then make some type cast, + * because we get all values as strings from CSV + * */ + private void saveKvs(SecurityUser user, E entity, Map data) { + Stream.of(BulkImportColumnType.SHARED_ATTRIBUTE, BulkImportColumnType.SERVER_ATTRIBUTE, BulkImportColumnType.TIMESERIES) + .map(kvType -> { + JsonObject kvs = new JsonObject(); + data.entrySet().stream() + .filter(dataEntry -> dataEntry.getKey().getType() == kvType && + StringUtils.isNotEmpty(dataEntry.getKey().getKey())) + .forEach(dataEntry -> kvs.add(dataEntry.getKey().getKey(), new JsonPrimitive(dataEntry.getValue()))); + return Map.entry(kvType, kvs); + }) + .filter(kvsEntry -> kvsEntry.getValue().entrySet().size() > 0) + .forEach(kvsEntry -> { + BulkImportColumnType kvType = kvsEntry.getKey(); + if (kvType == BulkImportColumnType.SHARED_ATTRIBUTE || kvType == BulkImportColumnType.SERVER_ATTRIBUTE) { + saveAttributes(user, entity, kvsEntry, kvType); + } else { + saveTelemetry(user, entity, kvsEntry); + } + }); + } + + @SneakyThrows + private void saveTelemetry(SecurityUser user, E entity, Map.Entry kvsEntry) { + List timeseries = JsonConverter.convertToTelemetry(kvsEntry.getValue(), System.currentTimeMillis()).entrySet().stream() + .flatMap(entry -> entry.getValue().stream().map(kvEntry -> new BasicTsKvEntry(entry.getKey(), kvEntry))) + .collect(Collectors.toList()); + + accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entity.getId(), (result, tenantId, entityId) -> { + TenantProfile tenantProfile = tenantProfileCache.get(tenantId); + long tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); + tsSubscriptionService.saveAndNotify(tenantId, user.getCustomerId(), entityId, timeseries, tenantTtl, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, + ActionType.TIMESERIES_UPDATED, null, timeseries); + } + + @Override + public void onFailure(Throwable t) { + entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, + ActionType.TIMESERIES_UPDATED, BaseController.toException(t), timeseries); + throw new RuntimeException(t); + } + }); + }); + } + + @SneakyThrows + private void saveAttributes(SecurityUser user, E entity, Map.Entry kvsEntry, BulkImportColumnType kvType) { + String scope = kvType.getKey(); + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue())); + + accessValidator.validateEntityAndCallback(user, Operation.WRITE_ATTRIBUTES, entity.getId(), (result, tenantId, entityId) -> { + tsSubscriptionService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<>() { + + @Override + public void onSuccess(Void unused) { + entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, + null, ActionType.ATTRIBUTES_UPDATED, null, scope, attributes); + } + + @Override + public void onFailure(Throwable throwable) { + entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, + null, ActionType.ATTRIBUTES_UPDATED, BaseController.toException(throwable), + scope, attributes); + throw new RuntimeException(throwable); + } + + }); + }); + } + + protected final String getByColumnType(BulkImportColumnType bulkImportColumnType, Map data) { + return data.entrySet().stream().filter(entry -> entry.getKey().getType() == bulkImportColumnType).findFirst().map(Map.Entry::getValue).orElse(null); + } + + private List> parseData(BulkImportRequest request) throws Exception { + List> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter()); + if (request.getMapping().getHeader()) { + records.remove(0); + } + + List columnsMappings = request.getMapping().getColumns(); + + return records.stream() + .map(record -> Stream.iterate(0, i -> i < record.size(), i -> i + 1) + .map(i -> Map.entry(columnsMappings.get(i), record.get(i))) + .filter(entry -> StringUtils.isNotEmpty(entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .collect(Collectors.toList()); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/importing/BulkImportColumnType.java b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportColumnType.java new file mode 100644 index 0000000000..a0ef0a15bc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportColumnType.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.importing; + +import lombok.Getter; +import org.thingsboard.server.common.data.DataConstants; + +public enum BulkImportColumnType { + NAME, + TYPE, + LABEL, + SHARED_ATTRIBUTE(DataConstants.SHARED_SCOPE), + SERVER_ATTRIBUTE(DataConstants.SERVER_SCOPE), + TIMESERIES, + ACCESS_TOKEN, + X509, + MQTT_CLIENT_ID, + MQTT_USER_NAME, + MQTT_PASSWORD, + LWM2M_CLIENT_ENDPOINT("endpoint"), + LWM2M_CLIENT_SECURITY_CONFIG_MODE("securityConfigClientMode"), + LWM2M_CLIENT_IDENTITY("identity"), + LWM2M_CLIENT_KEY("key"), + LWM2M_CLIENT_CERT("cert"), + LWM2M_BOOTSTRAP_SERVER_SECURITY_MODE("securityMode"), + LWM2M_BOOTSTRAP_SERVER_PUBLIC_KEY_OR_ID("clientPublicKeyOrId"), + LWM2M_BOOTSTRAP_SERVER_SECRET_KEY("clientSecretKey"), + LWM2M_SERVER_SECURITY_MODE("securityMode"), + LWM2M_SERVER_CLIENT_PUBLIC_KEY_OR_ID("clientPublicKeyOrId"), + LWM2M_SERVER_CLIENT_SECRET_KEY("clientSecretKey"), + IS_GATEWAY, + DESCRIPTION, + EDGE_LICENSE_KEY, + CLOUD_ENDPOINT, + ROUTING_KEY, + SECRET; + + @Getter + private String key; + + BulkImportColumnType() { + } + + BulkImportColumnType(String key) { + this.key = key; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/importing/BulkImportRequest.java b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportRequest.java new file mode 100644 index 0000000000..4059cc52d5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportRequest.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.importing; + +import lombok.Data; + +import java.util.List; + +@Data +public class BulkImportRequest { + private String file; + private Mapping mapping; + + @Data + public static class Mapping { + private List columns; + private Character delimiter; + private Boolean update; + private Boolean header; + } + + @Data + public static class ColumnMapping { + private BulkImportColumnType type; + private String key; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java new file mode 100644 index 0000000000..d6fa6ccbf9 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/importing/BulkImportResult.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.importing; + +import lombok.Data; + +import java.util.LinkedList; +import java.util.List; + +@Data +public class BulkImportResult { + private int created = 0; + private int updated = 0; + private int errors = 0; + private List errorsList = new LinkedList<>(); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java b/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java new file mode 100644 index 0000000000..846444c1d5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/importing/ImportedEntityInfo.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.importing; + +import lombok.Data; + +@Data +public class ImportedEntityInfo { + private E entity; + private boolean isUpdated; + private E oldEntity; +} diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java index 051a6c92b2..1dfb4a67bf 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java @@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.alarm.AlarmDao; @@ -34,17 +33,12 @@ import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantDao; -import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.action.RuleEngineEntityActionService; -import org.thingsboard.server.service.ttl.AbstractCleanUpService; +import org.thingsboard.server.service.action.EntityActionService; -import java.sql.Connection; -import java.sql.SQLException; import java.util.Date; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.TimeUnit; @TbCoreComponent @@ -60,7 +54,7 @@ public class AlarmsCleanUpService { private final AlarmDao alarmDao; private final AlarmService alarmService; private final RelationService relationService; - private final RuleEngineEntityActionService ruleEngineEntityActionService; + private final EntityActionService entityActionService; private final PartitionService partitionService; private final TbTenantProfileCache tenantProfileCache; @@ -90,7 +84,7 @@ public class AlarmsCleanUpService { toRemove.getData().forEach(alarmId -> { relationService.deleteEntityRelations(tenantId, alarmId); Alarm alarm = alarmService.deleteAlarm(tenantId, alarmId).getAlarm(); - ruleEngineEntityActionService.pushEntityActionToRuleEngine(alarm.getOriginator(), alarm, tenantId, null, ActionType.ALARM_DELETE, null); + entityActionService.pushEntityActionToRuleEngine(alarm.getOriginator(), alarm, tenantId, null, ActionType.ALARM_DELETE, null); }); totalRemoved += toRemove.getTotalElements(); diff --git a/application/src/main/java/org/thingsboard/server/utils/CsvUtils.java b/application/src/main/java/org/thingsboard/server/utils/CsvUtils.java new file mode 100644 index 0000000000..d8b6dff8a7 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/utils/CsvUtils.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.utils; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.input.CharSequenceReader; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CsvUtils { + + public static List> parseCsv(String content, Character delimiter) throws Exception { + CSVFormat csvFormat = delimiter.equals(',') ? CSVFormat.DEFAULT : CSVFormat.DEFAULT.withDelimiter(delimiter); + + List records; + try (CharSequenceReader reader = new CharSequenceReader(content)) { + records = csvFormat.parse(reader).getRecords(); + } + + return records.stream() + .map(record -> Stream.iterate(0, i -> i < record.size(), i -> i + 1) + .map(record::get) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java index 9abc619b48..c5519d329b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.validation.NoXss; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Optional; @EqualsAndHashCode(callSuper = true) @Slf4j @@ -83,6 +84,7 @@ public class Device extends SearchTextBasedWithAdditionalInfo implemen this.setDeviceData(device.getDeviceData()); this.setFirmwareId(device.getFirmwareId()); this.setSoftwareId(device.getSoftwareId()); + Optional.ofNullable(device.getAdditionalInfo()).ifPresent(this::setAdditionalInfo); return this; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java index f9d64cb712..594e6b635f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java @@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.validation.NoXss; +import java.util.Optional; + @EqualsAndHashCode(callSuper = true) public class Asset extends SearchTextBasedWithAdditionalInfo implements HasName, HasTenantId, HasCustomerId { @@ -56,6 +58,15 @@ public class Asset extends SearchTextBasedWithAdditionalInfo implements this.label = asset.getLabel(); } + public void update(Asset asset) { + this.tenantId = asset.getTenantId(); + this.customerId = asset.getCustomerId(); + this.name = asset.getName(); + this.type = asset.getType(); + this.label = asset.getLabel(); + Optional.ofNullable(asset.getAdditionalInfo()).ifPresent(this::setAdditionalInfo); + } + public TenantId getTenantId() { return tenantId; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/credentials/lwm2m/LwM2MClientCredentials.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/credentials/lwm2m/LwM2MClientCredentials.java index adf0c2ae62..b05168f974 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/credentials/lwm2m/LwM2MClientCredentials.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/credentials/lwm2m/LwM2MClientCredentials.java @@ -26,7 +26,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type(value = NoSecClientCredentials.class, name = "NO_SEC"), @JsonSubTypes.Type(value = PSKClientCredentials.class, name = "PSK"), @JsonSubTypes.Type(value = RPKClientCredentials.class, name = "RPK"), - @JsonSubTypes.Type(value = X509ClientCredentials.class, name = "X509")}) + @JsonSubTypes.Type(value = X509ClientCredentials.class, name = "X509") +}) public interface LwM2MClientCredentials { @JsonIgnore diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/Edge.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/Edge.java index 939bfb3648..7a858f8325 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/Edge.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/Edge.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.common.data.edge; -import com.fasterxml.jackson.databind.JsonNode; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -70,6 +69,19 @@ public class Edge extends SearchTextBasedWithAdditionalInfo implements H this.cloudEndpoint = edge.getCloudEndpoint(); } + public void update(Edge edge) { + this.tenantId = edge.getTenantId(); + this.customerId = edge.getCustomerId(); + this.rootRuleChainId = edge.getRootRuleChainId(); + this.type = edge.getType(); + this.label = edge.getLabel(); + this.name = edge.getName(); + this.routingKey = edge.getRoutingKey(); + this.secret = edge.getSecret(); + this.edgeLicenseKey = edge.getEdgeLicenseKey(); + this.cloudEndpoint = edge.getCloudEndpoint(); + } + @Override public String getSearchText() { return getName(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java index 473429d526..947de9391e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.adaptor; +import com.fasterxml.jackson.databind.JsonNode; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; @@ -577,6 +578,14 @@ public class JsonConverter { return GSON.toJson(element); } + public static JsonObject toJsonObject(Object o) { + return (JsonObject) GSON.toJsonTree(o); + } + + public static T fromJson(JsonElement element, Class type) { + return GSON.fromJson(element, type); + } + public static void setTypeCastEnabled(boolean enabled) { isTypeCastEnabled = enabled; } diff --git a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java index 18b7abb67e..da50963bd4 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java @@ -118,4 +118,9 @@ public class JacksonUtil { public static JsonNode valueToTree(T value) { return OBJECT_MAPPER.valueToTree(value); } + + public static T treeToValue(JsonNode tree, Class type) throws JsonProcessingException { + return OBJECT_MAPPER.treeToValue(tree, type); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index a954420f4f..e251e6a810 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -220,6 +220,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe if (foundDeviceCredentials == null) { deviceCredentialsService.createDeviceCredentials(savedDevice.getTenantId(), deviceCredentials); } else { + deviceCredentials.setId(foundDeviceCredentials.getId()); deviceCredentialsService.updateDeviceCredentials(device.getTenantId(), deviceCredentials); } }