@ -15,7 +15,10 @@
* /
package org.thingsboard.server.service.sync.vc ;
import com.google.common.collect.Iterables ;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import com.google.common.util.concurrent.MoreExecutors ;
import com.google.common.util.concurrent.SettableFuture ;
import com.google.protobuf.ByteString ;
import lombok.SneakyThrows ;
@ -23,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.context.annotation.Lazy ;
import org.springframework.stereotype.Service ;
import org.thingsboard.common.util.CollectionsUtil ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.server.cluster.TbClusterService ;
import org.thingsboard.server.common.data.EntityType ;
@ -71,12 +75,16 @@ import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest;
import org.thingsboard.server.service.sync.vc.data.VoidGitRequest ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeoutException ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.function.Consumer ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
@ -93,9 +101,12 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final SchedulerComponent scheduler ;
private final Map < UUID , PendingGitRequest < ? > > pendingRequestMap = new HashMap < > ( ) ;
private final Map < UUID , Map < String , String [ ] > > chunkedMsgs = new ConcurrentHashMap < > ( ) ;
@Value ( "${queue.vc.request-timeout:60000}" )
private int requestTimeout ;
@Value ( "${queue.vc.msg-chunk-size:500000}" )
private int msgChunkSize ;
public DefaultGitVersionControlQueueService ( TbServiceInfoProvider serviceInfoProvider , TbClusterService clusterService ,
DataDecodingEncodingService encodingService ,
@ -119,20 +130,35 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
return future ;
}
@SuppressWarnings ( "UnstableApiUsage" )
@Override
public ListenableFuture < Void > addToCommit ( CommitGitRequest commit , EntityExportData < ExportableEntity < EntityId > > entityData ) {
SettableFuture < Void > future = SettableFuture . create ( ) ;
String path = getRelativePath ( entityData . getEntityType ( ) , entityData . getExternalId ( ) ) ;
String entityDataJson = JacksonUtil . toPrettyString ( entityData . sort ( ) ) ;
registerAndSend ( commit , builder - > builder . setCommitRequest (
buildCommitRequest ( commit ) . setAddMsg (
TransportProtos . AddMsg . newBuilder ( )
. setRelativePath ( path ) . setEntityDataJson ( entityDataJson ) . build ( )
) . build ( )
) . build ( ) , wrap ( future , null ) ) ;
return future ;
Iterable < String > entityDataChunks = StringUtils . split ( entityDataJson , msgChunkSize ) ;
String chunkedMsgId = UUID . randomUUID ( ) . toString ( ) ;
int chunksCount = Iterables . size ( entityDataChunks ) ;
AtomicInteger chunkIndex = new AtomicInteger ( ) ;
List < ListenableFuture < Void > > futures = new ArrayList < > ( ) ;
entityDataChunks . forEach ( chunk - > {
SettableFuture < Void > chunkFuture = SettableFuture . create ( ) ;
log . trace ( "[{}] sending chunk {} for 'addToCommit'" , chunkedMsgId , chunkIndex . get ( ) ) ;
registerAndSend ( commit , builder - > builder . setCommitRequest (
buildCommitRequest ( commit ) . setAddMsg (
TransportProtos . AddMsg . newBuilder ( )
. setRelativePath ( path ) . setEntityDataJsonChunk ( chunk )
. setChunkedMsgId ( chunkedMsgId ) . setChunkIndex ( chunkIndex . getAndIncrement ( ) )
. setChunksCount ( chunksCount ) . build ( )
) . build ( )
) . build ( ) , wrap ( chunkFuture , null ) ) ;
futures . add ( chunkFuture ) ;
} ) ;
return Futures . transform ( Futures . allAsList ( futures ) , r - > {
log . trace ( "[{}] sent all chunks for 'addToCommit'" , chunkedMsgId ) ;
return null ;
} , MoreExecutors . directExecutor ( ) ) ;
}
@Override
@ -221,7 +247,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public ListenableFuture < List < VersionedEntityInfo > > listEntitiesAtVersion ( TenantId tenantId , String branch , String versionId , EntityType entityType ) {
return listEntitiesAtVersion ( tenantId , ListEntitiesRequestMsg . newBuilder ( )
. setBranchName ( branch )
. setVersionId ( versionId )
. setEntityType ( entityType . name ( ) )
. build ( ) ) ;
@ -230,7 +255,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public ListenableFuture < List < VersionedEntityInfo > > listEntitiesAtVersion ( TenantId tenantId , String branch , String versionId ) {
return listEntitiesAtVersion ( tenantId , ListEntitiesRequestMsg . newBuilder ( )
. setBranchName ( branch )
. setVersionId ( versionId )
. build ( ) ) ;
}
@ -257,18 +281,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
. build ( ) ) ) ;
}
@Override
public ListenableFuture < String > getContentsDiff ( TenantId tenantId , String content1 , String content2 ) {
ContentsDiffGitRequest request = new ContentsDiffGitRequest ( tenantId , content1 , content2 ) ;
return sendRequest ( request , builder - > builder . setContentsDiffRequest ( TransportProtos . ContentsDiffRequestMsg . newBuilder ( )
. setContent1 ( content1 )
. setContent2 ( content2 ) ) ) ;
}
@Override
@SuppressWarnings ( "rawtypes" )
public ListenableFuture < EntityExportData > getEntity ( TenantId tenantId , String versionId , EntityId entityId ) {
EntityContentGitRequest request = new EntityContentGitRequest ( tenantId , versionId , entityId ) ;
chunkedMsgs . put ( request . getRequestId ( ) , new HashMap < > ( ) ) ;
registerAndSend ( request , builder - > builder . setEntityContentRequest ( EntityContentRequestMsg . newBuilder ( )
. setVersionId ( versionId )
. setEntityType ( entityId . getEntityType ( ) . name ( ) )
@ -290,9 +307,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var requestBody = enrichFunction . apply ( newRequestProto ( request , settings ) ) ;
log . trace ( "[{}][{}] PUSHING request: {}" , request . getTenantId ( ) , request . getRequestId ( ) , requestBody ) ;
clusterService . pushMsgToVersionControl ( request . getTenantId ( ) , requestBody , callback ) ;
request . setTimeoutTask ( scheduler . schedule ( ( ) - > {
processTimeout ( request . getRequestId ( ) ) ;
} , requestTimeout , TimeUnit . MILLISECONDS ) ) ;
if ( request . getTimeoutTask ( ) = = null ) {
request . setTimeoutTask ( scheduler . schedule ( ( ) - > processTimeout ( request . getRequestId ( ) ) , requestTimeout , TimeUnit . MILLISECONDS ) ) ;
}
} else {
throw new RuntimeException ( "Future is already done!" ) ;
}
@ -310,7 +327,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@SuppressWarnings ( "rawtypes" )
public ListenableFuture < List < EntityExportData > > getEntities ( TenantId tenantId , String versionId , EntityType entityType , int offset , int limit ) {
EntitiesContentGitRequest request = new EntitiesContentGitRequest ( tenantId , versionId , entityType ) ;
chunkedMsgs . put ( request . getRequestId ( ) , new HashMap < > ( ) ) ;
registerAndSend ( request , builder - > builder . setEntitiesContentRequest ( EntitiesContentRequestMsg . newBuilder ( )
. setVersionId ( versionId )
. setEntityType ( entityType . name ( ) )
@ -356,15 +373,15 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public void processResponse ( VersionControlResponseMsg vcResponseMsg ) {
UUID requestId = new UUID ( vcResponseMsg . getRequestIdMSB ( ) , vcResponseMsg . getRequestIdLSB ( ) ) ;
PendingGitRequest < ? > request = pendingRequestMap . remove ( requestId ) ;
PendingGitRequest < ? > request = pendingRequestMap . get ( requestId ) ;
if ( request = = null ) {
log . debug ( "[{}] received stale response: {}" , requestId , vcResponseMsg ) ;
return ;
} else {
log . debug ( "[{}] processing response: {}" , requestId , vcResponseMsg ) ;
request . getTimeoutTask ( ) . cancel ( true ) ;
}
var future = request . getFuture ( ) ;
boolean completed = true ;
if ( ! StringUtils . isEmpty ( vcResponseMsg . getError ( ) ) ) {
future . setException ( new RuntimeException ( vcResponseMsg . getError ( ) ) ) ;
} else {
@ -392,12 +409,28 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var listVersionsResponse = vcResponseMsg . getListVersionsResponse ( ) ;
( ( ListVersionsGitRequest ) request ) . getFuture ( ) . set ( toPageData ( listVersionsResponse ) ) ;
} else if ( vcResponseMsg . hasEntityContentResponse ( ) ) {
var data = vcResponseMsg . getEntityContentResponse ( ) . getData ( ) ;
( ( EntityContentGitRequest ) request ) . getFuture ( ) . set ( toData ( data ) ) ;
TransportProtos . EntityContentResponseMsg responseMsg = vcResponseMsg . getEntityContentResponse ( ) ;
log . trace ( "[{}] received chunk {} for 'getEntity'" , responseMsg . getChunkedMsgId ( ) , responseMsg . getChunkIndex ( ) ) ;
var joined = joinChunks ( requestId , responseMsg , 1 ) ;
if ( joined . isPresent ( ) ) {
log . trace ( "[{}] collected all chunks for 'getEntity'" , responseMsg . getChunkedMsgId ( ) ) ;
( ( EntityContentGitRequest ) request ) . getFuture ( ) . set ( joined . get ( ) . get ( 0 ) ) ;
} else {
completed = false ;
}
} else if ( vcResponseMsg . hasEntitiesContentResponse ( ) ) {
var dataList = vcResponseMsg . getEntitiesContentResponse ( ) . getDataList ( ) ;
( ( EntitiesContentGitRequest ) request ) . getFuture ( )
. set ( dataList . stream ( ) . map ( this : : toData ) . collect ( Collectors . toList ( ) ) ) ;
TransportProtos . EntitiesContentResponseMsg responseMsg = vcResponseMsg . getEntitiesContentResponse ( ) ;
TransportProtos . EntityContentResponseMsg item = responseMsg . getItem ( ) ;
if ( responseMsg . getItemsCount ( ) > 0 ) {
var joined = joinChunks ( requestId , item , responseMsg . getItemsCount ( ) ) ;
if ( joined . isPresent ( ) ) {
( ( EntitiesContentGitRequest ) request ) . getFuture ( ) . set ( joined . get ( ) ) ;
} else {
completed = false ;
}
} else {
( ( EntitiesContentGitRequest ) request ) . getFuture ( ) . set ( Collections . emptyList ( ) ) ;
}
} else if ( vcResponseMsg . hasVersionsDiffResponse ( ) ) {
TransportProtos . VersionsDiffResponseMsg diffResponse = vcResponseMsg . getVersionsDiffResponse ( ) ;
List < EntityVersionsDiff > entityVersionsDiffList = diffResponse . getDiffList ( ) . stream ( )
@ -412,21 +445,50 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
. build ( ) )
. collect ( Collectors . toList ( ) ) ;
( ( VersionsDiffGitRequest ) request ) . getFuture ( ) . set ( entityVersionsDiffList ) ;
} else if ( vcResponseMsg . hasContentsDiffResponse ( ) ) {
String diff = vcResponseMsg . getContentsDiffResponse ( ) . getDiff ( ) ;
( ( ContentsDiffGitRequest ) request ) . getFuture ( ) . set ( diff ) ;
}
}
if ( completed ) {
removePendingRequest ( requestId ) ;
}
}
@SuppressWarnings ( "rawtypes" )
private Optional < List < EntityExportData > > joinChunks ( UUID requestId , TransportProtos . EntityContentResponseMsg responseMsg , int expectedMsgCount ) {
var chunksMap = chunkedMsgs . get ( requestId ) ;
if ( chunksMap = = null ) {
return Optional . empty ( ) ;
}
String [ ] msgChunks = chunksMap . computeIfAbsent ( responseMsg . getChunkedMsgId ( ) , id - > new String [ responseMsg . getChunksCount ( ) ] ) ;
msgChunks [ responseMsg . getChunkIndex ( ) ] = responseMsg . getData ( ) ;
if ( chunksMap . size ( ) = = expectedMsgCount & & chunksMap . values ( ) . stream ( )
. allMatch ( chunks - > CollectionsUtil . countNonNull ( chunks ) = = chunks . length ) ) {
return Optional . of ( chunksMap . values ( ) . stream ( )
. map ( chunks - > String . join ( "" , chunks ) )
. map ( this : : toData )
. collect ( Collectors . toList ( ) ) ) ;
} else {
return Optional . empty ( ) ;
}
}
private void processTimeout ( UUID requestId ) {
PendingGitRequest < ? > pendingRequest = pendingRequestMap . remove ( requestId ) ;
PendingGitRequest < ? > pendingRequest = removePendingRequest ( requestId ) ;
if ( pendingRequest ! = null ) {
log . debug ( "[{}] request timed out ({} ms}" , requestId , requestTimeout ) ;
pendingRequest . getFuture ( ) . setException ( new TimeoutException ( "Request timed out" ) ) ;
}
}
private PendingGitRequest < ? > removePendingRequest ( UUID requestId ) {
PendingGitRequest < ? > pendingRequest = pendingRequestMap . remove ( requestId ) ;
if ( pendingRequest ! = null & & pendingRequest . getTimeoutTask ( ) ! = null ) {
pendingRequest . getTimeoutTask ( ) . cancel ( true ) ;
pendingRequest . setTimeoutTask ( null ) ;
}
chunkedMsgs . remove ( requestId ) ;
return pendingRequest ;
}
private PageData < EntityVersion > toPageData ( TransportProtos . ListVersionsResponseMsg listVersionsResponse ) {
var listVersions = listVersionsResponse . getVersionsList ( ) . stream ( ) . map ( this : : getEntityVersion ) . collect ( Collectors . toList ( ) ) ;
return new PageData < > ( listVersions , listVersionsResponse . getTotalPages ( ) , listVersionsResponse . getTotalElements ( ) , listVersionsResponse . getHasNext ( ) ) ;
@ -450,6 +512,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
return JacksonUtil . fromString ( data , EntityExportData . class ) ;
}
//The future will be completed when the corresponding result arrives from kafka
private static < T > TbQueueCallback wrap ( SettableFuture < T > future ) {
return new TbQueueCallback ( ) {
@Override
@ -463,7 +526,8 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
} ;
}
private static < T > TbQueueCallback wrap ( SettableFuture < T > future , T value ) {
//The future will be completed when the request is successfully sent to kafka
private < T > TbQueueCallback wrap ( SettableFuture < T > future , T value ) {
return new TbQueueCallback ( ) {
@Override
public void onSuccess ( TbQueueMsgMetadata metadata ) {