From e084fc21588e8bde5c79a050e50cfe79c76ae48c Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Fri, 20 Feb 2026 21:11:57 +0200 Subject: [PATCH] Add job finish callback mechanism Introduce TbCallback-based finish notification for submitted jobs, allowing callers to be notified when a job reaches a terminal state (COMPLETED, FAILED, CANCELLED) via cluster-wide ComponentLifecycleMsg broadcast. Co-Authored-By: Claude Opus 4.6 --- .../entitiy/EntityStateSourcingListener.java | 7 +- .../server/service/job/DefaultJobManager.java | 92 ++++++++++++++++ .../server/service/job/JobManagerTest.java | 101 +++++++++++++++++- .../server/common/data/job/Job.java | 34 ++++++ .../common/data/job/task/DummyTaskResult.java | 5 + .../common/data/job/task/TaskResult.java | 3 + .../rule/engine/api/JobManager.java | 3 + 7 files changed, 242 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 5f4a6d29ee..c3d275883e 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; +import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -360,10 +361,12 @@ public class EntityStateSourcingListener { jobManager.onJobUpdate(job); ComponentLifecycleEvent event; - if (job.getResult().getCancellationTs() > 0) { + if (job.getStatus() == JobStatus.CANCELLED) { event = ComponentLifecycleEvent.STOPPED; - } else if (job.getResult().getGeneralError() != null) { + } else if (job.getStatus() == JobStatus.FAILED) { event = ComponentLifecycleEvent.FAILED; + } else if (job.getStatus() == JobStatus.COMPLETED) { + event = ComponentLifecycleEvent.UPDATED; } else { return; } diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index f17f10bfd2..d14ba2067a 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -15,15 +15,20 @@ */ package org.thingsboard.server.service.job; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.Nullable; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.JobManager; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; @@ -33,7 +38,10 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.common.data.job.task.TaskResult; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; @@ -50,7 +58,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -65,6 +76,8 @@ public class DefaultJobManager implements JobManager { private final Map jobProcessors; private final Map>> taskProducers; private final ExecutorService executor; + private final ConcurrentHashMap finishCallbacks = new ConcurrentHashMap<>(); + private final ScheduledExecutorService cleanupScheduler; public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, PartitionService partitionService, TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig, @@ -76,6 +89,8 @@ public class DefaultJobManager implements JobManager { this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); + this.cleanupScheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("job-callback-cleanup"); + this.cleanupScheduler.scheduleWithFixedDelay(this::cleanupStaleCallbacks, 1, 1, TimeUnit.HOURS); } @Override @@ -84,6 +99,25 @@ public class DefaultJobManager implements JobManager { return Futures.submit(() -> jobService.saveJob(job.getTenantId(), job), executor); } + @Override + public ListenableFuture submitJob(Job job, TbCallback finishCallback) { + ListenableFuture saveFuture = submitJob(job); + if (finishCallback != null) { + Futures.addCallback(saveFuture, new FutureCallback<>() { + @Override + public void onSuccess(Job savedJob) { + finishCallbacks.put(savedJob.getId(), finishCallback); + } + + @Override + public void onFailure(Throwable t) { + finishCallback.onFailure(t); + } + }, MoreExecutors.directExecutor()); + } + return saveFuture; + } + @Override public void onJobUpdate(Job job) { JobStatus status = job.getStatus(); @@ -109,6 +143,35 @@ public class DefaultJobManager implements JobManager { } } + @EventListener + public void onJobUpdateEvent(ComponentLifecycleMsg event) { + EntityId entityId = event.getEntityId(); + if (entityId.getEntityType() != EntityType.JOB) { + return; + } + + ComponentLifecycleEvent lifecycleEvent = event.getEvent(); + if (!lifecycleEvent.equals(ComponentLifecycleEvent.STOPPED) && + !lifecycleEvent.equals(ComponentLifecycleEvent.FAILED) && + !lifecycleEvent.equals(ComponentLifecycleEvent.UPDATED)) { + return; + } + JobId jobId = new JobId(entityId.getId()); + TbCallback callback = finishCallbacks.remove(jobId); + if (callback == null) { + return; + } + executor.execute(() -> { + try { + Job job = jobService.findJobById(event.getTenantId(), jobId); + invokeFinishCallback(job, callback); + } catch (Throwable e) { + log.error("[{}] Failed to invoke finish callback", jobId, e); + callback.onFailure(e); + } + }); + } + private void processJob(Job job) { TenantId tenantId = job.getTenantId(); JobId jobId = job.getId(); @@ -195,12 +258,41 @@ public class DefaultJobManager implements JobManager { }); } + private void invokeFinishCallback(@Nullable Job job, TbCallback callback) { + if (job == null) { + callback.onFailure(new RuntimeException("Job not found")); + } else if (job.getStatus() == JobStatus.COMPLETED) { + callback.onSuccess(); + } else { + callback.onFailure(new RuntimeException(job.getError())); + } + } + + private void cleanupStaleCallbacks() { + finishCallbacks.entrySet().removeIf(entry -> { + JobId jobId = entry.getKey(); + try { + Job job = jobService.findJobById(TenantId.SYS_TENANT_ID, jobId); + if (job == null || job.getStatus().isOneOf(JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED)) { + invokeFinishCallback(job, entry.getValue()); + return true; + } + return false; + } catch (Throwable e) { + log.error("[{}] Failed to cleanup stale callback", jobId, e); + entry.getValue().onFailure(e); + return true; + } + }); + } + private JobProcessor getJobProcessor(JobType jobType) { return jobProcessors.get(jobType); } @PreDestroy private void destroy() { + cleanupScheduler.shutdownNow(); executor.shutdownNow(); } diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index c17a9f6bfb..63fa4a7da4 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.job; +import com.google.common.util.concurrent.SettableFuture; import lombok.SneakyThrows; import org.junit.After; import org.junit.Before; @@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.DummyTaskResult; import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.job.JobDao; import org.thingsboard.server.dao.service.DaoSqlTest; @@ -44,10 +46,12 @@ import org.thingsboard.server.queue.task.JobStatsService; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -518,19 +522,114 @@ public class JobManagerTest extends AbstractControllerTest { }); } + @Test + public void testSubmitJob_finishCallback_success() { + SettableFuture future = SettableFuture.create(); + + int tasksCount = 3; + submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(100) + .build(), "test-job", TbCallback.wrap(future)); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isNull(); + }); + } + + @Test + public void testSubmitJob_finishCallback_taskFailure() { + SettableFuture future = SettableFuture.create(); + + submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(1) + .failedTasksCount(2) + .errors(List.of("task error")) + .retries(0) + .taskProcessingTimeMs(100) + .build(), "test-job", TbCallback.wrap(future)); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(future.isDone()).isTrue(); + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .cause() + .hasMessage("task error; task error"); + }); + } + + @Test + public void testSubmitJob_finishCallback_generalError() { + SettableFuture future = SettableFuture.create(); + + submitJob(DummyJobConfiguration.builder() + .generalError("Something went wrong") + .submittedTasksBeforeGeneralError(0) + .build(), "test-job", TbCallback.wrap(future)); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(future.isDone()).isTrue(); + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .cause() + .hasMessage("Something went wrong"); + }); + } + + @Test + public void testSubmitJob_finishCallback_cancelled() throws Exception { + SettableFuture future = SettableFuture.create(); + + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(200) + .taskProcessingTimeMs(50) + .build(), "test-job", TbCallback.wrap(future)).getId(); + + Thread.sleep(500); + cancelJob(jobId); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(future.isDone()).isTrue(); + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .cause() + .hasMessage("The task was cancelled"); + }); + } + + @Test + public void testSubmitJob_finishCallback_zeroTasks() { + SettableFuture future = SettableFuture.create(); + + submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(0) + .build(), "test-job", TbCallback.wrap(future)); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isNull(); + }); + } + private Job submitJob(DummyJobConfiguration configuration) { return submitJob(configuration, "test-job"); } @SneakyThrows private Job submitJob(DummyJobConfiguration configuration, String key) { + return submitJob(configuration, key, null); + } + + @SneakyThrows + private Job submitJob(DummyJobConfiguration configuration, String key, TbCallback callback) { return jobManager.submitJob(Job.builder() .tenantId(tenantId) .type(JobType.DUMMY) .key(key) .entityId(jobEntity.getId()) .configuration(configuration) - .build()).get(); + .build(), callback).get(); } private List getFailures(JobResult jobResult) { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java index f68012d793..d99fdf1961 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.job; +import com.fasterxml.jackson.annotation.JsonIgnore; import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; @@ -29,6 +30,7 @@ import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.job.task.TaskResult; import java.util.Set; import java.util.UUID; @@ -82,4 +84,36 @@ public class Job extends BaseData implements HasTenantId { return (C) configuration; } + @JsonIgnore + public String getError() { + if (status == JobStatus.CANCELLED) { + return "The task was cancelled"; + } + if (result.getGeneralError() != null) { + return result.getGeneralError(); + } + if (result.getFailedCount() > 0 && result.getResults() != null) { + StringBuilder errorMessage = new StringBuilder(); + for (TaskResult taskResult : result.getResults()) { + if (taskResult.isSuccess() || taskResult.isDiscarded()) { + continue; + } + String error = taskResult.getError(); + if (error == null) { + continue; + } + if (!errorMessage.isEmpty()) { + if (errorMessage.length() + 2 + error.length() > 256) { + errorMessage.append("..."); + break; + } + errorMessage.append("; "); + } + errorMessage.append(error); + } + return errorMessage.toString(); + } + return "Task failed"; + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java index e6a4702b81..37f851f611 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTaskResult.java @@ -64,6 +64,11 @@ public class DummyTaskResult extends TaskResult { return JobType.DUMMY; } + @Override + public String getError() { + return failure != null ? failure.getError() : null; + } + @Data @NoArgsConstructor @EqualsAndHashCode(callSuper = true) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java index 761b518e98..808738a78b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/TaskResult.java @@ -46,4 +46,7 @@ public abstract class TaskResult { @JsonIgnore public abstract JobType getJobType(); + @JsonIgnore + public abstract String getError(); + } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java index e4cb573cfe..f12d59cd0a 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java @@ -19,11 +19,14 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; +import org.thingsboard.server.common.msg.queue.TbCallback; public interface JobManager { ListenableFuture submitJob(Job job); // TODO: rate limits + ListenableFuture submitJob(Job job, TbCallback finishCallback); + void cancelJob(TenantId tenantId, JobId jobId); void reprocessJob(TenantId tenantId, JobId jobId);