@ -15,15 +15,20 @@
* /
package org.thingsboard.server.service.job ;
import com.google.common.util.concurrent.FutureCallback ;
import com.google.common.util.concurrent.Futures ;
import com.google.common.util.concurrent.ListenableFuture ;
import com.google.common.util.concurrent.MoreExecutors ;
import jakarta.annotation.Nullable ;
import jakarta.annotation.PreDestroy ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.lang3.ObjectUtils ;
import org.springframework.context.event.EventListener ;
import org.springframework.stereotype.Component ;
import org.thingsboard.common.util.JacksonUtil ;
import org.thingsboard.common.util.ThingsBoardExecutors ;
import org.thingsboard.rule.engine.api.JobManager ;
import org.thingsboard.server.common.data.EntityType ;
import org.thingsboard.server.common.data.id.EntityId ;
import org.thingsboard.server.common.data.id.JobId ;
import org.thingsboard.server.common.data.id.TenantId ;
@ -33,7 +38,10 @@ import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType ;
import org.thingsboard.server.common.data.job.task.Task ;
import org.thingsboard.server.common.data.job.task.TaskResult ;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent ;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg ;
import org.thingsboard.server.common.msg.queue.ServiceType ;
import org.thingsboard.server.common.msg.queue.TbCallback ;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo ;
import org.thingsboard.server.dao.job.JobService ;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto ;
@ -50,7 +58,10 @@ import java.util.Arrays;
import java.util.List ;
import java.util.Map ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
@ -65,6 +76,8 @@ public class DefaultJobManager implements JobManager {
private final Map < JobType , JobProcessor > jobProcessors ;
private final Map < JobType , TbQueueProducer < TbProtoQueueMsg < TaskProto > > > taskProducers ;
private final ExecutorService executor ;
private final ConcurrentHashMap < JobId , TbCallback > finishCallbacks = new ConcurrentHashMap < > ( ) ;
private final ScheduledExecutorService cleanupScheduler ;
public DefaultJobManager ( JobService jobService , JobStatsService jobStatsService , PartitionService partitionService ,
TaskProducerQueueFactory queueFactory , TasksQueueConfig queueConfig ,
@ -76,6 +89,8 @@ public class DefaultJobManager implements JobManager {
this . jobProcessors = jobProcessors . stream ( ) . collect ( Collectors . toMap ( JobProcessor : : getType , Function . identity ( ) ) ) ;
this . taskProducers = Arrays . stream ( JobType . values ( ) ) . collect ( Collectors . toMap ( Function . identity ( ) , queueFactory : : createTaskProducer ) ) ;
this . executor = ThingsBoardExecutors . newWorkStealingPool ( Math . max ( 4 , Runtime . getRuntime ( ) . availableProcessors ( ) ) , getClass ( ) ) ;
this . cleanupScheduler = ThingsBoardExecutors . newSingleThreadScheduledExecutor ( "job-callback-cleanup" ) ;
this . cleanupScheduler . scheduleWithFixedDelay ( this : : cleanupStaleCallbacks , 1 , 1 , TimeUnit . HOURS ) ;
}
@Override
@ -84,6 +99,25 @@ public class DefaultJobManager implements JobManager {
return Futures . submit ( ( ) - > jobService . saveJob ( job . getTenantId ( ) , job ) , executor ) ;
}
@Override
public ListenableFuture < Job > submitJob ( Job job , TbCallback finishCallback ) {
ListenableFuture < Job > saveFuture = submitJob ( job ) ;
if ( finishCallback ! = null ) {
Futures . addCallback ( saveFuture , new FutureCallback < > ( ) {
@Override
public void onSuccess ( Job savedJob ) {
finishCallbacks . put ( savedJob . getId ( ) , finishCallback ) ;
}
@Override
public void onFailure ( Throwable t ) {
finishCallback . onFailure ( t ) ;
}
} , MoreExecutors . directExecutor ( ) ) ;
}
return saveFuture ;
}
@Override
public void onJobUpdate ( Job job ) {
JobStatus status = job . getStatus ( ) ;
@ -109,6 +143,35 @@ public class DefaultJobManager implements JobManager {
}
}
@EventListener
public void onJobUpdateEvent ( ComponentLifecycleMsg event ) {
EntityId entityId = event . getEntityId ( ) ;
if ( entityId . getEntityType ( ) ! = EntityType . JOB ) {
return ;
}
ComponentLifecycleEvent lifecycleEvent = event . getEvent ( ) ;
if ( ! lifecycleEvent . equals ( ComponentLifecycleEvent . STOPPED ) & &
! lifecycleEvent . equals ( ComponentLifecycleEvent . FAILED ) & &
! lifecycleEvent . equals ( ComponentLifecycleEvent . UPDATED ) ) {
return ;
}
JobId jobId = new JobId ( entityId . getId ( ) ) ;
TbCallback callback = finishCallbacks . remove ( jobId ) ;
if ( callback = = null ) {
return ;
}
executor . execute ( ( ) - > {
try {
Job job = jobService . findJobById ( event . getTenantId ( ) , jobId ) ;
invokeFinishCallback ( job , callback ) ;
} catch ( Throwable e ) {
log . error ( "[{}] Failed to invoke finish callback" , jobId , e ) ;
callback . onFailure ( e ) ;
}
} ) ;
}
private void processJob ( Job job ) {
TenantId tenantId = job . getTenantId ( ) ;
JobId jobId = job . getId ( ) ;
@ -195,12 +258,41 @@ public class DefaultJobManager implements JobManager {
} ) ;
}
private void invokeFinishCallback ( @Nullable Job job , TbCallback callback ) {
if ( job = = null ) {
callback . onFailure ( new RuntimeException ( "Job not found" ) ) ;
} else if ( job . getStatus ( ) = = JobStatus . COMPLETED ) {
callback . onSuccess ( ) ;
} else {
callback . onFailure ( new RuntimeException ( job . getError ( ) ) ) ;
}
}
private void cleanupStaleCallbacks ( ) {
finishCallbacks . entrySet ( ) . removeIf ( entry - > {
JobId jobId = entry . getKey ( ) ;
try {
Job job = jobService . findJobById ( TenantId . SYS_TENANT_ID , jobId ) ;
if ( job = = null | | job . getStatus ( ) . isOneOf ( JobStatus . COMPLETED , JobStatus . FAILED , JobStatus . CANCELLED ) ) {
invokeFinishCallback ( job , entry . getValue ( ) ) ;
return true ;
}
return false ;
} catch ( Throwable e ) {
log . error ( "[{}] Failed to cleanup stale callback" , jobId , e ) ;
entry . getValue ( ) . onFailure ( e ) ;
return true ;
}
} ) ;
}
private JobProcessor getJobProcessor ( JobType jobType ) {
return jobProcessors . get ( jobType ) ;
}
@PreDestroy
private void destroy ( ) {
cleanupScheduler . shutdownNow ( ) ;
executor . shutdownNow ( ) ;
}