22 changed files with 1025 additions and 61 deletions
@ -0,0 +1,94 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.asset; |
|||
|
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import com.fasterxml.jackson.databind.node.TextNode; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.asset.Asset; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.action.EntityActionService; |
|||
import org.thingsboard.server.service.importing.AbstractBulkImportService; |
|||
import org.thingsboard.server.service.importing.BulkImportRequest; |
|||
import org.thingsboard.server.service.importing.ImportedEntityInfo; |
|||
import org.thingsboard.server.service.queue.TbClusterService; |
|||
import org.thingsboard.server.service.security.AccessValidator; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.permission.AccessControlService; |
|||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|||
|
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
|
|||
|
|||
@Service |
|||
@TbCoreComponent |
|||
public class AssetBulkImportService extends AbstractBulkImportService<Asset> { |
|||
private final AssetService assetService; |
|||
|
|||
public AssetBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, |
|||
AccessControlService accessControlService, AccessValidator accessValidator, |
|||
EntityActionService entityActionService, TbClusterService clusterService, AssetService assetService) { |
|||
super(tsSubscriptionService, tenantProfileCache, accessControlService, accessValidator, entityActionService, clusterService); |
|||
this.assetService = assetService; |
|||
} |
|||
|
|||
@Override |
|||
protected ImportedEntityInfo<Asset> saveEntity(BulkImportRequest importRequest, Map<BulkImportRequest.ColumnMapping, String> entityData, SecurityUser user) { |
|||
ImportedEntityInfo<Asset> importedEntityInfo = new ImportedEntityInfo<>(); |
|||
|
|||
Asset asset = new Asset(); |
|||
asset.setTenantId(user.getTenantId()); |
|||
setAssetFields(asset, entityData); |
|||
|
|||
Asset existingAsset = assetService.findAssetByTenantIdAndName(user.getTenantId(), asset.getName()); |
|||
if (existingAsset != null && importRequest.getMapping().getUpdate()) { |
|||
importedEntityInfo.setOldEntity(new Asset(existingAsset)); |
|||
importedEntityInfo.setUpdated(true); |
|||
existingAsset.update(asset); |
|||
asset = existingAsset; |
|||
} |
|||
asset = assetService.saveAsset(asset); |
|||
|
|||
importedEntityInfo.setEntity(asset); |
|||
return importedEntityInfo; |
|||
} |
|||
|
|||
private void setAssetFields(Asset asset, Map<BulkImportRequest.ColumnMapping, String> data) { |
|||
ObjectNode additionalInfo = (ObjectNode) Optional.ofNullable(asset.getAdditionalInfo()).orElseGet(JacksonUtil::newObjectNode); |
|||
data.forEach((columnMapping, value) -> { |
|||
switch (columnMapping.getType()) { |
|||
case NAME: |
|||
asset.setName(value); |
|||
break; |
|||
case TYPE: |
|||
asset.setType(value); |
|||
break; |
|||
case LABEL: |
|||
asset.setLabel(value); |
|||
break; |
|||
case DESCRIPTION: |
|||
additionalInfo.set("description", new TextNode(value)); |
|||
break; |
|||
} |
|||
}); |
|||
asset.setAdditionalInfo(additionalInfo); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,242 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.device; |
|||
|
|||
import com.fasterxml.jackson.databind.node.BooleanNode; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import com.fasterxml.jackson.databind.node.TextNode; |
|||
import com.google.gson.JsonObject; |
|||
import com.google.gson.JsonPrimitive; |
|||
import lombok.SneakyThrows; |
|||
import org.apache.commons.collections.CollectionUtils; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.DeviceProfile; |
|||
import org.thingsboard.server.common.data.DeviceProfileProvisionType; |
|||
import org.thingsboard.server.common.data.DeviceProfileType; |
|||
import org.thingsboard.server.common.data.DeviceTransportType; |
|||
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; |
|||
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MClientCredentials; |
|||
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.DeviceProfileData; |
|||
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentials; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentialsType; |
|||
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|||
import org.thingsboard.server.dao.device.DeviceCredentialsService; |
|||
import org.thingsboard.server.dao.device.DeviceProfileService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.action.EntityActionService; |
|||
import org.thingsboard.server.service.importing.AbstractBulkImportService; |
|||
import org.thingsboard.server.service.importing.BulkImportColumnType; |
|||
import org.thingsboard.server.service.importing.BulkImportRequest; |
|||
import org.thingsboard.server.service.importing.ImportedEntityInfo; |
|||
import org.thingsboard.server.service.queue.TbClusterService; |
|||
import org.thingsboard.server.service.security.AccessValidator; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.permission.AccessControlService; |
|||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|||
|
|||
import java.util.EnumSet; |
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
import java.util.Set; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.thingsboard.server.dao.service.Validator.validateId; |
|||
|
|||
@Service |
|||
@TbCoreComponent |
|||
public class DeviceBulkImportService extends AbstractBulkImportService<Device> { |
|||
protected final DeviceService deviceService; |
|||
protected final DeviceCredentialsService deviceCredentialsService; |
|||
protected final DeviceProfileService deviceProfileService; |
|||
|
|||
public DeviceBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, |
|||
AccessControlService accessControlService, AccessValidator accessValidator, |
|||
EntityActionService entityActionService, TbClusterService clusterService, |
|||
DeviceService deviceService, DeviceCredentialsService deviceCredentialsService, |
|||
DeviceProfileService deviceProfileService) { |
|||
super(tsSubscriptionService, tenantProfileCache, accessControlService, accessValidator, entityActionService, clusterService); |
|||
this.deviceService = deviceService; |
|||
this.deviceCredentialsService = deviceCredentialsService; |
|||
this.deviceProfileService = deviceProfileService; |
|||
} |
|||
|
|||
@Override |
|||
protected ImportedEntityInfo<Device> saveEntity(BulkImportRequest importRequest, Map<BulkImportRequest.ColumnMapping, String> entityData, SecurityUser user) { |
|||
ImportedEntityInfo<Device> importedEntityInfo = new ImportedEntityInfo<>(); |
|||
|
|||
Device device = new Device(); |
|||
device.setTenantId(user.getTenantId()); |
|||
setDeviceFields(device, entityData); |
|||
|
|||
Device existingDevice = deviceService.findDeviceByTenantIdAndName(user.getTenantId(), device.getName()); |
|||
if (existingDevice != null && importRequest.getMapping().getUpdate()) { |
|||
importedEntityInfo.setOldEntity(new Device(existingDevice)); |
|||
importedEntityInfo.setUpdated(true); |
|||
existingDevice.updateDevice(device); |
|||
device = existingDevice; |
|||
} |
|||
|
|||
DeviceCredentials deviceCredentials = createDeviceCredentials(entityData); |
|||
if (deviceCredentials.getCredentialsType() != null) { |
|||
if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { |
|||
setUpLwM2mDeviceProfile(user.getTenantId(), device); |
|||
} |
|||
device = deviceService.saveDeviceWithCredentials(device, deviceCredentials); |
|||
} else { |
|||
device = deviceService.saveDevice(device); |
|||
} |
|||
|
|||
importedEntityInfo.setEntity(device); |
|||
|
|||
return importedEntityInfo; |
|||
} |
|||
|
|||
private void setDeviceFields(Device device, Map<BulkImportRequest.ColumnMapping, String> data) { |
|||
ObjectNode additionalInfo = (ObjectNode) Optional.ofNullable(device.getAdditionalInfo()).orElseGet(JacksonUtil::newObjectNode); |
|||
data.forEach((columnMapping, value) -> { |
|||
switch (columnMapping.getType()) { |
|||
case NAME: |
|||
device.setName(value); |
|||
break; |
|||
case TYPE: |
|||
device.setType(value); |
|||
break; |
|||
case LABEL: |
|||
device.setLabel(value); |
|||
break; |
|||
case DESCRIPTION: |
|||
additionalInfo.set("description", new TextNode(value)); |
|||
break; |
|||
case IS_GATEWAY: |
|||
additionalInfo.set("gateway", BooleanNode.valueOf(Boolean.parseBoolean(value))); |
|||
break; |
|||
} |
|||
device.setAdditionalInfo(additionalInfo); |
|||
}); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
private DeviceCredentials createDeviceCredentials(Map<BulkImportRequest.ColumnMapping, String> data) { |
|||
Set<BulkImportColumnType> columns = data.keySet().stream().map(BulkImportRequest.ColumnMapping::getType).collect(Collectors.toSet()); |
|||
|
|||
DeviceCredentials credentials = new DeviceCredentials(); |
|||
|
|||
if (columns.contains(BulkImportColumnType.ACCESS_TOKEN)) { |
|||
credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); |
|||
credentials.setCredentialsId(getByColumnType(BulkImportColumnType.ACCESS_TOKEN, data)); |
|||
} else if (CollectionUtils.containsAny(columns, EnumSet.of(BulkImportColumnType.MQTT_CLIENT_ID, BulkImportColumnType.MQTT_USER_NAME, BulkImportColumnType.MQTT_PASSWORD))) { |
|||
credentials.setCredentialsType(DeviceCredentialsType.MQTT_BASIC); |
|||
|
|||
BasicMqttCredentials basicMqttCredentials = new BasicMqttCredentials(); |
|||
basicMqttCredentials.setClientId(getByColumnType(BulkImportColumnType.MQTT_CLIENT_ID, data)); |
|||
basicMqttCredentials.setUserName(getByColumnType(BulkImportColumnType.MQTT_USER_NAME, data)); |
|||
basicMqttCredentials.setPassword(getByColumnType(BulkImportColumnType.MQTT_PASSWORD, data)); |
|||
credentials.setCredentialsValue(JacksonUtil.toString(basicMqttCredentials)); |
|||
} else if (columns.contains(BulkImportColumnType.X509)) { |
|||
credentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE); |
|||
credentials.setCredentialsValue(getByColumnType(BulkImportColumnType.X509, data)); |
|||
} else if (columns.contains(BulkImportColumnType.LWM2M_CLIENT_ENDPOINT)) { |
|||
credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS); |
|||
ObjectNode lwm2mCredentials = JacksonUtil.newObjectNode(); |
|||
|
|||
ObjectNode client = JacksonUtil.newObjectNode(); |
|||
Stream.of(BulkImportColumnType.LWM2M_CLIENT_ENDPOINT, BulkImportColumnType.LWM2M_CLIENT_SECURITY_CONFIG_MODE, |
|||
BulkImportColumnType.LWM2M_CLIENT_IDENTITY, BulkImportColumnType.LWM2M_CLIENT_KEY, BulkImportColumnType.LWM2M_CLIENT_CERT) |
|||
.forEach(lwm2mClientProperty -> { |
|||
String value = getByColumnType(lwm2mClientProperty, data); |
|||
if (value != null) { |
|||
client.set(lwm2mClientProperty.getKey(), new TextNode(value)); |
|||
} |
|||
}); |
|||
|
|||
LwM2MClientCredentials lwM2MClientCredentials = JacksonUtil.treeToValue(client, LwM2MClientCredentials.class); |
|||
// so that only fields needed for specific type of lwM2MClientCredentials were saved in json
|
|||
lwm2mCredentials.set("client", JacksonUtil.valueToTree(lwM2MClientCredentials)); |
|||
|
|||
ObjectNode bootstrapServer = JacksonUtil.newObjectNode(); |
|||
Stream.of(BulkImportColumnType.LWM2M_BOOTSTRAP_SERVER_SECURITY_MODE, BulkImportColumnType.LWM2M_BOOTSTRAP_SERVER_PUBLIC_KEY_OR_ID, |
|||
BulkImportColumnType.LWM2M_BOOTSTRAP_SERVER_SECRET_KEY) |
|||
.forEach(lwm2mBootstrapServerProperty -> { |
|||
String value = getByColumnType(lwm2mBootstrapServerProperty, data); |
|||
if (value != null) { |
|||
bootstrapServer.set(lwm2mBootstrapServerProperty.getKey(), new TextNode(value)); |
|||
} |
|||
}); |
|||
|
|||
ObjectNode lwm2mServer = JacksonUtil.newObjectNode(); |
|||
Stream.of(BulkImportColumnType.LWM2M_SERVER_SECURITY_MODE, BulkImportColumnType.LWM2M_SERVER_CLIENT_PUBLIC_KEY_OR_ID, |
|||
BulkImportColumnType.LWM2M_SERVER_CLIENT_SECRET_KEY) |
|||
.forEach(lwm2mServerProperty -> { |
|||
String value = getByColumnType(lwm2mServerProperty, data); |
|||
if (value != null) { |
|||
lwm2mServer.set(lwm2mServerProperty.getKey(), new TextNode(value)); |
|||
} |
|||
}); |
|||
|
|||
ObjectNode bootstrap = JacksonUtil.newObjectNode(); |
|||
bootstrap.set("bootstrapServer", bootstrapServer); |
|||
bootstrap.set("lwm2mServer", lwm2mServer); |
|||
lwm2mCredentials.set("bootstrap", bootstrap); |
|||
|
|||
credentials.setCredentialsValue(lwm2mCredentials.toString()); |
|||
} |
|||
|
|||
return credentials; |
|||
} |
|||
|
|||
private void setUpLwM2mDeviceProfile(TenantId tenantId, Device device) { |
|||
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileByName(tenantId, device.getType()); |
|||
if (deviceProfile != null) { |
|||
if (deviceProfile.getTransportType() != DeviceTransportType.LWM2M) { |
|||
deviceProfile.setTransportType(DeviceTransportType.LWM2M); |
|||
deviceProfile.getProfileData().setTransportConfiguration(new Lwm2mDeviceProfileTransportConfiguration()); |
|||
deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); |
|||
device.setDeviceProfileId(deviceProfile.getId()); |
|||
} |
|||
} else { |
|||
deviceProfile = new DeviceProfile(); |
|||
deviceProfile.setTenantId(tenantId); |
|||
deviceProfile.setType(DeviceProfileType.DEFAULT); |
|||
deviceProfile.setName(device.getType()); |
|||
deviceProfile.setTransportType(DeviceTransportType.LWM2M); |
|||
deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); |
|||
|
|||
DeviceProfileData deviceProfileData = new DeviceProfileData(); |
|||
DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration(); |
|||
DeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration(); |
|||
DisabledDeviceProfileProvisionConfiguration provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null); |
|||
|
|||
deviceProfileData.setConfiguration(configuration); |
|||
deviceProfileData.setTransportConfiguration(transportConfiguration); |
|||
deviceProfileData.setProvisionConfiguration(provisionConfiguration); |
|||
deviceProfile.setProfileData(deviceProfileData); |
|||
|
|||
deviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); |
|||
device.setDeviceProfileId(deviceProfile.getId()); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,105 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge; |
|||
|
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import com.fasterxml.jackson.databind.node.TextNode; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.dao.edge.EdgeService; |
|||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.action.EntityActionService; |
|||
import org.thingsboard.server.service.importing.AbstractBulkImportService; |
|||
import org.thingsboard.server.service.importing.BulkImportRequest; |
|||
import org.thingsboard.server.service.importing.ImportedEntityInfo; |
|||
import org.thingsboard.server.service.queue.TbClusterService; |
|||
import org.thingsboard.server.service.security.AccessValidator; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.permission.AccessControlService; |
|||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|||
|
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
|
|||
@Service |
|||
@TbCoreComponent |
|||
public class EdgeBulkImportService extends AbstractBulkImportService<Edge> { |
|||
private final EdgeService edgeService; |
|||
|
|||
public EdgeBulkImportService(TelemetrySubscriptionService tsSubscriptionService, TbTenantProfileCache tenantProfileCache, |
|||
AccessControlService accessControlService, AccessValidator accessValidator, |
|||
EntityActionService entityActionService, TbClusterService clusterService, EdgeService edgeService) { |
|||
super(tsSubscriptionService, tenantProfileCache, accessControlService, accessValidator, entityActionService, clusterService); |
|||
this.edgeService = edgeService; |
|||
} |
|||
|
|||
@Override |
|||
protected ImportedEntityInfo<Edge> saveEntity(BulkImportRequest importRequest, Map<BulkImportRequest.ColumnMapping, String> entityData, SecurityUser user) { |
|||
ImportedEntityInfo<Edge> importedEntityInfo = new ImportedEntityInfo<>(); |
|||
|
|||
Edge edge = new Edge(); |
|||
edge.setTenantId(user.getTenantId()); |
|||
setEdgeFields(edge, entityData); |
|||
|
|||
Edge existingEdge = edgeService.findEdgeByTenantIdAndName(user.getTenantId(), edge.getName()); |
|||
if (existingEdge != null && importRequest.getMapping().getUpdate()) { |
|||
importedEntityInfo.setOldEntity(new Edge(existingEdge)); |
|||
importedEntityInfo.setUpdated(true); |
|||
existingEdge.update(edge); |
|||
edge = existingEdge; |
|||
} |
|||
edge = edgeService.saveEdge(edge, true); |
|||
|
|||
importedEntityInfo.setEntity(edge); |
|||
return importedEntityInfo; |
|||
} |
|||
|
|||
private void setEdgeFields(Edge edge, Map<BulkImportRequest.ColumnMapping, String> data) { |
|||
ObjectNode additionalInfo = (ObjectNode) Optional.ofNullable(edge.getAdditionalInfo()).orElseGet(JacksonUtil::newObjectNode); |
|||
data.forEach((columnMapping, value) -> { |
|||
switch (columnMapping.getType()) { |
|||
case NAME: |
|||
edge.setName(value); |
|||
break; |
|||
case TYPE: |
|||
edge.setType(value); |
|||
break; |
|||
case LABEL: |
|||
edge.setLabel(value); |
|||
break; |
|||
case DESCRIPTION: |
|||
additionalInfo.set("description", new TextNode(value)); |
|||
break; |
|||
case EDGE_LICENSE_KEY: |
|||
edge.setEdgeLicenseKey(value); |
|||
break; |
|||
case CLOUD_ENDPOINT: |
|||
edge.setCloudEndpoint(value); |
|||
break; |
|||
case ROUTING_KEY: |
|||
edge.setRoutingKey(value); |
|||
break; |
|||
case SECRET: |
|||
edge.setSecret(value); |
|||
break; |
|||
} |
|||
}); |
|||
edge.setAdditionalInfo(additionalInfo); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,196 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.importing; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.gson.JsonObject; |
|||
import com.google.gson.JsonPrimitive; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.SneakyThrows; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.thingsboard.server.common.data.BaseData; |
|||
import org.thingsboard.server.common.data.TenantProfile; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.UUIDBased; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; |
|||
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|||
import org.thingsboard.server.controller.BaseController; |
|||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|||
import org.thingsboard.server.service.action.EntityActionService; |
|||
import org.thingsboard.server.service.importing.BulkImportRequest.ColumnMapping; |
|||
import org.thingsboard.server.service.queue.TbClusterService; |
|||
import org.thingsboard.server.service.security.AccessValidator; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.permission.AccessControlService; |
|||
import org.thingsboard.server.service.security.permission.Operation; |
|||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|||
import org.thingsboard.server.utils.CsvUtils; |
|||
|
|||
import javax.annotation.Nullable; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.function.Consumer; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.Stream; |
|||
|
|||
@RequiredArgsConstructor |
|||
public abstract class AbstractBulkImportService<E extends BaseData<? extends EntityId>> { |
|||
protected final TelemetrySubscriptionService tsSubscriptionService; |
|||
protected final TbTenantProfileCache tenantProfileCache; |
|||
protected final AccessControlService accessControlService; |
|||
protected final AccessValidator accessValidator; |
|||
protected final EntityActionService entityActionService; |
|||
protected final TbClusterService clusterService; |
|||
|
|||
public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception { |
|||
BulkImportResult<E> result = new BulkImportResult<>(); |
|||
|
|||
AtomicInteger i = new AtomicInteger(0); |
|||
if (request.getMapping().getHeader()) { |
|||
i.incrementAndGet(); |
|||
} |
|||
|
|||
parseData(request).forEach(entityData -> { |
|||
i.incrementAndGet(); |
|||
try { |
|||
ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData, user); |
|||
onEntityImported.accept(importedEntityInfo); |
|||
|
|||
E entity = importedEntityInfo.getEntity(); |
|||
|
|||
saveKvs(user, entity, entityData); |
|||
|
|||
if (importedEntityInfo.isUpdated()) { |
|||
result.setUpdated(result.getUpdated() + 1); |
|||
} else { |
|||
result.setCreated(result.getCreated() + 1); |
|||
} |
|||
} catch (Exception e) { |
|||
result.setErrors(result.getErrors() + 1); |
|||
result.getErrorsList().add(String.format("Line %d: %s", i.get(), e.getMessage())); |
|||
} |
|||
}); |
|||
|
|||
return result; |
|||
} |
|||
|
|||
protected abstract ImportedEntityInfo<E> saveEntity(BulkImportRequest importRequest, Map<ColumnMapping, String> entityData, SecurityUser user); |
|||
|
|||
/* |
|||
* Attributes' values are firstly added to JsonObject in order to then make some type cast, |
|||
* because we get all values as strings from CSV |
|||
* */ |
|||
private void saveKvs(SecurityUser user, E entity, Map<ColumnMapping, String> data) { |
|||
Stream.of(BulkImportColumnType.SHARED_ATTRIBUTE, BulkImportColumnType.SERVER_ATTRIBUTE, BulkImportColumnType.TIMESERIES) |
|||
.map(kvType -> { |
|||
JsonObject kvs = new JsonObject(); |
|||
data.entrySet().stream() |
|||
.filter(dataEntry -> dataEntry.getKey().getType() == kvType && |
|||
StringUtils.isNotEmpty(dataEntry.getKey().getKey())) |
|||
.forEach(dataEntry -> kvs.add(dataEntry.getKey().getKey(), new JsonPrimitive(dataEntry.getValue()))); |
|||
return Map.entry(kvType, kvs); |
|||
}) |
|||
.filter(kvsEntry -> kvsEntry.getValue().entrySet().size() > 0) |
|||
.forEach(kvsEntry -> { |
|||
BulkImportColumnType kvType = kvsEntry.getKey(); |
|||
if (kvType == BulkImportColumnType.SHARED_ATTRIBUTE || kvType == BulkImportColumnType.SERVER_ATTRIBUTE) { |
|||
saveAttributes(user, entity, kvsEntry, kvType); |
|||
} else { |
|||
saveTelemetry(user, entity, kvsEntry); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
private void saveTelemetry(SecurityUser user, E entity, Map.Entry<BulkImportColumnType, JsonObject> kvsEntry) { |
|||
List<TsKvEntry> timeseries = JsonConverter.convertToTelemetry(kvsEntry.getValue(), System.currentTimeMillis()).entrySet().stream() |
|||
.flatMap(entry -> entry.getValue().stream().map(kvEntry -> new BasicTsKvEntry(entry.getKey(), kvEntry))) |
|||
.collect(Collectors.toList()); |
|||
|
|||
accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entity.getId(), (result, tenantId, entityId) -> { |
|||
TenantProfile tenantProfile = tenantProfileCache.get(tenantId); |
|||
long tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); |
|||
tsSubscriptionService.saveAndNotify(tenantId, user.getCustomerId(), entityId, timeseries, tenantTtl, new FutureCallback<Void>() { |
|||
@Override |
|||
public void onSuccess(@Nullable Void tmp) { |
|||
entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, |
|||
ActionType.TIMESERIES_UPDATED, null, timeseries); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, |
|||
ActionType.TIMESERIES_UPDATED, BaseController.toException(t), timeseries); |
|||
throw new RuntimeException(t); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
private void saveAttributes(SecurityUser user, E entity, Map.Entry<BulkImportColumnType, JsonObject> kvsEntry, BulkImportColumnType kvType) { |
|||
String scope = kvType.getKey(); |
|||
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue())); |
|||
|
|||
accessValidator.validateEntityAndCallback(user, Operation.WRITE_ATTRIBUTES, entity.getId(), (result, tenantId, entityId) -> { |
|||
tsSubscriptionService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<>() { |
|||
|
|||
@Override |
|||
public void onSuccess(Void unused) { |
|||
entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, |
|||
null, ActionType.ATTRIBUTES_UPDATED, null, scope, attributes); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable throwable) { |
|||
entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, |
|||
null, ActionType.ATTRIBUTES_UPDATED, BaseController.toException(throwable), |
|||
scope, attributes); |
|||
throw new RuntimeException(throwable); |
|||
} |
|||
|
|||
}); |
|||
}); |
|||
} |
|||
|
|||
protected final String getByColumnType(BulkImportColumnType bulkImportColumnType, Map<ColumnMapping, String> data) { |
|||
return data.entrySet().stream().filter(entry -> entry.getKey().getType() == bulkImportColumnType).findFirst().map(Map.Entry::getValue).orElse(null); |
|||
} |
|||
|
|||
private List<Map<ColumnMapping, String>> parseData(BulkImportRequest request) throws Exception { |
|||
List<List<String>> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter()); |
|||
if (request.getMapping().getHeader()) { |
|||
records.remove(0); |
|||
} |
|||
|
|||
List<ColumnMapping> columnsMappings = request.getMapping().getColumns(); |
|||
|
|||
return records.stream() |
|||
.map(record -> Stream.iterate(0, i -> i < record.size(), i -> i + 1) |
|||
.map(i -> Map.entry(columnsMappings.get(i), record.get(i))) |
|||
.filter(entry -> StringUtils.isNotEmpty(entry.getValue())) |
|||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) |
|||
.collect(Collectors.toList()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.importing; |
|||
|
|||
import lombok.Getter; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
|
|||
public enum BulkImportColumnType { |
|||
NAME, |
|||
TYPE, |
|||
LABEL, |
|||
SHARED_ATTRIBUTE(DataConstants.SHARED_SCOPE), |
|||
SERVER_ATTRIBUTE(DataConstants.SERVER_SCOPE), |
|||
TIMESERIES, |
|||
ACCESS_TOKEN, |
|||
X509, |
|||
MQTT_CLIENT_ID, |
|||
MQTT_USER_NAME, |
|||
MQTT_PASSWORD, |
|||
LWM2M_CLIENT_ENDPOINT("endpoint"), |
|||
LWM2M_CLIENT_SECURITY_CONFIG_MODE("securityConfigClientMode"), |
|||
LWM2M_CLIENT_IDENTITY("identity"), |
|||
LWM2M_CLIENT_KEY("key"), |
|||
LWM2M_CLIENT_CERT("cert"), |
|||
LWM2M_BOOTSTRAP_SERVER_SECURITY_MODE("securityMode"), |
|||
LWM2M_BOOTSTRAP_SERVER_PUBLIC_KEY_OR_ID("clientPublicKeyOrId"), |
|||
LWM2M_BOOTSTRAP_SERVER_SECRET_KEY("clientSecretKey"), |
|||
LWM2M_SERVER_SECURITY_MODE("securityMode"), |
|||
LWM2M_SERVER_CLIENT_PUBLIC_KEY_OR_ID("clientPublicKeyOrId"), |
|||
LWM2M_SERVER_CLIENT_SECRET_KEY("clientSecretKey"), |
|||
IS_GATEWAY, |
|||
DESCRIPTION, |
|||
EDGE_LICENSE_KEY, |
|||
CLOUD_ENDPOINT, |
|||
ROUTING_KEY, |
|||
SECRET; |
|||
|
|||
@Getter |
|||
private String key; |
|||
|
|||
BulkImportColumnType() { |
|||
} |
|||
|
|||
BulkImportColumnType(String key) { |
|||
this.key = key; |
|||
} |
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.importing; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class BulkImportRequest { |
|||
private String file; |
|||
private Mapping mapping; |
|||
|
|||
@Data |
|||
public static class Mapping { |
|||
private List<ColumnMapping> columns; |
|||
private Character delimiter; |
|||
private Boolean update; |
|||
private Boolean header; |
|||
} |
|||
|
|||
@Data |
|||
public static class ColumnMapping { |
|||
private BulkImportColumnType type; |
|||
private String key; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,30 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.importing; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.LinkedList; |
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class BulkImportResult<E> { |
|||
private int created = 0; |
|||
private int updated = 0; |
|||
private int errors = 0; |
|||
private List<String> errorsList = new LinkedList<>(); |
|||
|
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.importing; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class ImportedEntityInfo<E> { |
|||
private E entity; |
|||
private boolean isUpdated; |
|||
private E oldEntity; |
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.utils; |
|||
|
|||
import lombok.AccessLevel; |
|||
import lombok.NoArgsConstructor; |
|||
import org.apache.commons.csv.CSVFormat; |
|||
import org.apache.commons.csv.CSVRecord; |
|||
import org.apache.commons.io.input.CharSequenceReader; |
|||
|
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.Stream; |
|||
|
|||
@NoArgsConstructor(access = AccessLevel.PRIVATE) |
|||
public class CsvUtils { |
|||
|
|||
public static List<List<String>> parseCsv(String content, Character delimiter) throws Exception { |
|||
CSVFormat csvFormat = delimiter.equals(',') ? CSVFormat.DEFAULT : CSVFormat.DEFAULT.withDelimiter(delimiter); |
|||
|
|||
List<CSVRecord> records; |
|||
try (CharSequenceReader reader = new CharSequenceReader(content)) { |
|||
records = csvFormat.parse(reader).getRecords(); |
|||
} |
|||
|
|||
return records.stream() |
|||
.map(record -> Stream.iterate(0, i -> i < record.size(), i -> i + 1) |
|||
.map(record::get) |
|||
.collect(Collectors.toList())) |
|||
.collect(Collectors.toList()); |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue