|
|
|
@ -22,6 +22,9 @@ import lombok.Data; |
|
|
|
import lombok.RequiredArgsConstructor; |
|
|
|
import lombok.SneakyThrows; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.apache.commons.lang3.exception.ExceptionUtils; |
|
|
|
import org.thingsboard.common.util.DonAsynchron; |
|
|
|
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|
|
|
import org.thingsboard.server.cluster.TbClusterService; |
|
|
|
import org.thingsboard.server.common.data.BaseData; |
|
|
|
import org.thingsboard.server.common.data.TenantProfile; |
|
|
|
@ -47,11 +50,16 @@ import org.thingsboard.server.utils.CsvUtils; |
|
|
|
import org.thingsboard.server.utils.TypeCastUtil; |
|
|
|
|
|
|
|
import javax.annotation.Nullable; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.LinkedHashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.LinkedBlockingQueue; |
|
|
|
import java.util.concurrent.ThreadPoolExecutor; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
import java.util.function.Consumer; |
|
|
|
@ -67,39 +75,49 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent |
|
|
|
protected final EntityActionService entityActionService; |
|
|
|
protected final TbClusterService clusterService; |
|
|
|
|
|
|
|
public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception { |
|
|
|
BulkImportResult<E> result = new BulkImportResult<>(); |
|
|
|
private static ThreadPoolExecutor executor; |
|
|
|
|
|
|
|
AtomicInteger i = new AtomicInteger(0); |
|
|
|
if (request.getMapping().getHeader()) { |
|
|
|
i.incrementAndGet(); |
|
|
|
@PostConstruct |
|
|
|
private void initExecutor() { |
|
|
|
if (executor == null) { |
|
|
|
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), |
|
|
|
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(150_000), |
|
|
|
ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy()); |
|
|
|
executor.allowCoreThreadTimeOut(true); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
parseData(request).forEach(entityData -> { |
|
|
|
i.incrementAndGet(); |
|
|
|
try { |
|
|
|
ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData.getFields(), user); |
|
|
|
onEntityImported.accept(importedEntityInfo); |
|
|
|
public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user, Consumer<ImportedEntityInfo<E>> onEntityImported) throws Exception { |
|
|
|
List<EntityData> entitiesData = parseData(request); |
|
|
|
|
|
|
|
E entity = importedEntityInfo.getEntity(); |
|
|
|
BulkImportResult<E> result = new BulkImportResult<>(); |
|
|
|
CountDownLatch completionLatch = new CountDownLatch(entitiesData.size()); |
|
|
|
|
|
|
|
saveKvs(user, entity, entityData.getKvs()); |
|
|
|
entitiesData.forEach(entityData -> DonAsynchron.submit(() -> { |
|
|
|
ImportedEntityInfo<E> importedEntityInfo = saveEntity(request, entityData.getFields(), user); |
|
|
|
E entity = importedEntityInfo.getEntity(); |
|
|
|
|
|
|
|
if (importedEntityInfo.getRelatedError() != null) { |
|
|
|
throw new RuntimeException(importedEntityInfo.getRelatedError()); |
|
|
|
} |
|
|
|
onEntityImported.accept(importedEntityInfo); |
|
|
|
saveKvs(user, entity, entityData.getKvs()); |
|
|
|
|
|
|
|
if (importedEntityInfo.isUpdated()) { |
|
|
|
result.setUpdated(result.getUpdated() + 1); |
|
|
|
} else { |
|
|
|
result.setCreated(result.getCreated() + 1); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
result.setErrors(result.getErrors() + 1); |
|
|
|
result.getErrorsList().add(String.format("Line %d: %s", i.get(), e.getMessage())); |
|
|
|
} |
|
|
|
}); |
|
|
|
return importedEntityInfo; |
|
|
|
}, |
|
|
|
importedEntityInfo -> { |
|
|
|
if (importedEntityInfo.isUpdated()) { |
|
|
|
result.getUpdated().incrementAndGet(); |
|
|
|
} else { |
|
|
|
result.getCreated().incrementAndGet(); |
|
|
|
} |
|
|
|
completionLatch.countDown(); |
|
|
|
}, |
|
|
|
throwable -> { |
|
|
|
result.getErrors().incrementAndGet(); |
|
|
|
result.getErrorsList().add(String.format("Line %d: %s", entityData.getLineNumber(), ExceptionUtils.getRootCauseMessage(throwable))); |
|
|
|
completionLatch.countDown(); |
|
|
|
}, |
|
|
|
executor)); |
|
|
|
|
|
|
|
completionLatch.await(); |
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
@ -186,8 +204,11 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent |
|
|
|
|
|
|
|
private List<EntityData> parseData(BulkImportRequest request) throws Exception { |
|
|
|
List<List<String>> records = CsvUtils.parseCsv(request.getFile(), request.getMapping().getDelimiter()); |
|
|
|
AtomicInteger linesCounter = new AtomicInteger(0); |
|
|
|
|
|
|
|
if (request.getMapping().getHeader()) { |
|
|
|
records.remove(0); |
|
|
|
linesCounter.incrementAndGet(); |
|
|
|
} |
|
|
|
|
|
|
|
List<ColumnMapping> columnsMappings = request.getMapping().getColumns(); |
|
|
|
@ -205,15 +226,24 @@ public abstract class AbstractBulkImportService<E extends BaseData<? extends Ent |
|
|
|
entityData.getKvs().put(entry.getKey(), new ParsedValue(castResult.getValue(), castResult.getKey())); |
|
|
|
} |
|
|
|
}); |
|
|
|
entityData.setLineNumber(linesCounter.incrementAndGet()); |
|
|
|
return entityData; |
|
|
|
}) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
} |
|
|
|
|
|
|
|
@PreDestroy |
|
|
|
private void shutdownExecutor() { |
|
|
|
if (!executor.isTerminating()) { |
|
|
|
executor.shutdown(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Data |
|
|
|
protected static class EntityData { |
|
|
|
private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>(); |
|
|
|
private final Map<ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>(); |
|
|
|
private int lineNumber; |
|
|
|
} |
|
|
|
|
|
|
|
@Data |
|
|
|
|