@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.HasId ;
import org.thingsboard.server.common.data.id.TenantId ;
import org.thingsboard.server.common.data.page.PageData ;
import org.thingsboard.server.common.data.page.PageDataIterable ;
import org.thingsboard.server.common.data.page.PageLink ;
import org.thingsboard.server.common.data.sync.ie.EntityExportData ;
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings ;
@ -89,6 +90,7 @@ import java.util.Collections;
import java.util.HashSet ;
import java.util.List ;
import java.util.Optional ;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.ExecutionException ;
import java.util.function.Function ;
@ -246,16 +248,18 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
switch ( request . getType ( ) ) {
case SINGLE_ENTITY : {
SingleEntityVersionLoadRequest versionLoadRequest = ( SingleEntityVersionLoadRequest ) request ;
ctx . setRollbackOnError ( true ) ;
VersionLoadConfig config = versionLoadRequest . getConfig ( ) ;
ListenableFuture < EntityExportData > future = gitServiceQueue . getEntity ( user . getTenantId ( ) , request . getVersionId ( ) , versionLoadRequest . getExternalEntityId ( ) ) ;
DonAsynchron . withCallback ( future ,
entityData - > doInTemplate ( ctx , request , c - > loadSingleEntity ( c , config , entityData ) ) ,
entityData - > load ( ctx , request , c - > loadSingleEntity ( c , config , entityData ) ) ,
e - > processLoadError ( ctx , e ) , executor ) ;
break ;
}
case ENTITY_TYPE : {
EntityTypeVersionLoadRequest versionLoadRequest = ( EntityTypeVersionLoadRequest ) request ;
executor . submit ( ( ) - > doInTemplate ( ctx , request , c - > loadMultipleEntities ( c , versionLoadRequest ) ) ) ;
ctx . setRollbackOnError ( versionLoadRequest . isRollbackOnError ( ) ) ;
executor . submit ( ( ) - > load ( ctx , request , c - > loadMultipleEntities ( c , versionLoadRequest ) ) ) ;
break ;
}
default :
@ -265,19 +269,24 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
return ctx . getRequestId ( ) ;
}
private < R > VersionLoadResult doInTemplate ( EntitiesImportCtx ctx , VersionLoadRequest request , Function < EntitiesImportCtx , VersionLoadResult > f unction) {
private < R > VersionLoadResult load ( EntitiesImportCtx ctx , VersionLoadRequest request , Function < EntitiesImportCtx , VersionLoadResult > loadF unction) {
try {
VersionLoadResult result = transactionTemplate . execute ( status - > {
try {
return function . apply ( ctx ) ;
} catch ( RuntimeException e ) {
throw e ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ; // to prevent UndeclaredThrowableException
VersionLoadResult result ;
if ( ctx . isRollbackOnError ( ) ) {
result = transactionTemplate . execute ( status - > {
try {
return loadFunction . apply ( ctx ) ;
} catch ( RuntimeException e ) {
throw e ;
} catch ( Exception e ) {
throw new RuntimeException ( e ) ; // to prevent UndeclaredThrowableException
}
} ) ;
for ( ThrowingRunnable eventCallback : ctx . getEventCallbacks ( ) ) {
eventCallback . run ( ) ;
}
} ) ;
for ( ThrowingRunnable throwingRunnable : ctx . getEventCallbacks ( ) ) {
throwingRunnable . run ( ) ;
} else {
result = loadFunction . apply ( ctx ) ;
}
result . setDone ( true ) ;
return cachePut ( ctx . getRequestId ( ) , result ) ;
@ -324,7 +333,6 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
sw . startNew ( "Entities " + entityType . name ( ) ) ;
ctx . setSettings ( getEntityImportSettings ( request , entityType ) ) ;
importEntities ( ctx , entityType ) ;
persistToCache ( ctx ) ;
}
sw . startNew ( "Reimport" ) ;
@ -336,7 +344,6 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
. filter ( entityType - > request . getEntityTypes ( ) . get ( entityType ) . isRemoveOtherEntities ( ) )
. sorted ( exportImportService . getEntityTypeComparatorForImport ( ) . reversed ( ) )
. forEach ( entityType - > removeOtherEntities ( ctx , entityType ) ) ;
persistToCache ( ctx ) ;
sw . startNew ( "References and Relations" ) ;
exportImportService . saveReferencesAndRelations ( ctx ) ;
@ -389,6 +396,8 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
ctx . getImportedEntities ( ) . computeIfAbsent ( entityType , t - > new HashSet < > ( ) )
. add ( importResult . getSavedEntity ( ) . getId ( ) ) ;
}
persistToCache ( ctx ) ;
log . debug ( "Imported {} pack ({}) for tenant {}" , entityType , entityDataList . size ( ) , ctx . getTenantId ( ) ) ;
offset + = limit ;
} while ( entityDataList . size ( ) = = limit ) ;
@ -413,17 +422,34 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
}
private void removeOtherEntities ( EntitiesImportCtx ctx , EntityType entityType ) {
DaoUtil . processInBatches ( pageLink - > {
return exportableEntitiesService . findEntitiesByTenantId ( ctx . getTenantId ( ) , entityType , pageLink ) ;
} , 100 , entity - > {
if ( ctx . getImportedEntities ( ) . get ( entityType ) = = null | | ! ctx . getImportedEntities ( ) . get ( entityType ) . contains ( entity . getId ( ) ) ) {
exportableEntitiesService . removeById ( ctx . getTenantId ( ) , entity . getId ( ) ) ;
ctx . addEventCallback ( ( ) - > logEntityActionService . logEntityAction ( ctx . getTenantId ( ) , entity . getId ( ) , entity , null ,
ActionType . DELETED , ctx . getUser ( ) ) ) ;
ctx . registerDeleted ( entityType ) ;
var entities = new PageDataIterable < > ( link - > exportableEntitiesService . findEntitiesIdsByTenantId ( ctx . getTenantId ( ) , entityType , link ) , 100 ) ;
Set < EntityId > toRemove = new HashSet < > ( ) ;
for ( EntityId entityId : entities ) {
if ( ctx . getImportedEntities ( ) . get ( entityType ) = = null | | ! ctx . getImportedEntities ( ) . get ( entityType ) . contains ( entityId ) ) {
toRemove . add ( entityId ) ;
}
} ) ;
}
for ( EntityId entityId : toRemove ) {
ExportableEntity < EntityId > entity = exportableEntitiesService . findEntityById ( entityId ) ;
exportableEntitiesService . removeById ( ctx . getTenantId ( ) , entityId ) ;
ThrowingRunnable callback = ( ) - > {
logEntityActionService . logEntityAction ( ctx . getTenantId ( ) , entity . getId ( ) , entity , null ,
ActionType . DELETED , ctx . getUser ( ) ) ;
} ;
if ( ctx . isRollbackOnError ( ) ) {
ctx . addEventCallback ( callback ) ;
} else {
try {
callback . run ( ) ;
} catch ( ThingsboardException e ) {
throw new RuntimeException ( e ) ;
}
}
ctx . registerDeleted ( entityType ) ;
}
persistToCache ( ctx ) ;
}
private VersionLoadResult onError ( EntityId externalId , Throwable e ) {