From 60fdb7df3de458d75835698be2bc633925501dd6 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 28 Apr 2025 17:24:14 +0300 Subject: [PATCH] Multiple queued jobs of the same type --- .../server/controller/JobController.java | 2 +- .../entitiy/EntityStateSourcingListener.java | 7 +- .../job/CfReprocessingJobProcessor.java | 7 +- .../server/service/job/DefaultJobManager.java | 48 +++++-- .../server/service/job/DummyJobProcessor.java | 11 +- .../server/service/job/JobManager.java | 2 + .../server/service/job/JobProcessor.java | 8 +- .../server/service/job/JobManagerTest.java | 119 +++++++++++++++++- .../src/test/resources/logback-test.xml | 2 +- .../server/dao/{task => job}/JobService.java | 4 +- .../common/data/job/CfReprocessingTask.java | 2 + .../data/job/DummyJobConfiguration.java | 3 + .../server/common/data/job/DummyTask.java | 2 + .../server/common/data/job/Job.java | 3 +- .../server/common/data/job/JobResult.java | 5 +- .../server/common/data/job/JobStatus.java | 16 ++- .../server/common/data/job/TaskResult.java | 2 +- .../server/queue/task/JobStatsService.java | 2 +- .../server/queue/task/TaskProcessor.java | 16 +-- .../dao/{task => job}/DefaultJobService.java | 119 ++++++++++++------ .../server/dao/{task => job}/JobDao.java | 4 +- .../dao/sql/{task => job}/JobRepository.java | 49 ++------ .../dao/sql/{task => job}/JpaJobDao.java | 10 +- 23 files changed, 314 insertions(+), 129 deletions(-) rename common/dao-api/src/main/java/org/thingsboard/server/dao/{task => job}/JobService.java (92%) rename dao/src/main/java/org/thingsboard/server/dao/{task => job}/DefaultJobService.java (54%) rename dao/src/main/java/org/thingsboard/server/dao/{task => job}/JobDao.java (90%) rename dao/src/main/java/org/thingsboard/server/dao/sql/{task => job}/JobRepository.java (56%) rename dao/src/main/java/org/thingsboard/server/dao/sql/{task => job}/JpaJobDao.java (87%) diff --git a/application/src/main/java/org/thingsboard/server/controller/JobController.java b/application/src/main/java/org/thingsboard/server/controller/JobController.java index d315a522ae..5718d6e388 100644 --- a/application/src/main/java/org/thingsboard/server/controller/JobController.java +++ b/application/src/main/java/org/thingsboard/server/controller/JobController.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.dao.task.JobService; +import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.job.JobManager; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 68fb0bb7cf..f70354c3a8 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; +import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.service.job.JobManager; import java.util.Set; @@ -70,6 +72,7 @@ public class EntityStateSourcingListener { private final TenantService tenantService; private final TbClusterService tbClusterService; private final EdgeSynchronizationManager edgeSynchronizationManager; + private final JobManager jobManager; @PostConstruct public void init() { @@ -300,7 +303,9 @@ public class EntityStateSourcingListener { } private void onJobUpdate(Job job) { - if (job.getResult().getCancellationTs() > 0) { + jobManager.onJobUpdate(job); + if (job.getResult().getCancellationTs() > 0 || job.getStatus().isOneOf(JobStatus.FAILED)) { + // task processors will add this job to the list of discarded tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED); } } diff --git a/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java index b5f6c4665f..edb38da5c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.job; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.id.AssetProfileId; @@ -36,15 +35,13 @@ import java.util.function.Consumer; @Component @RequiredArgsConstructor -public class CfReprocessingJobProcessor extends JobProcessor { +public class CfReprocessingJobProcessor implements JobProcessor { private final DeviceService deviceService; private final AssetService assetService; - // fixme: multiple jobs with single type - @Transactional @Override - public int process(Job job, Consumer taskConsumer) { + public int process(Job job, Consumer taskConsumer) throws Exception { CfReprocessingJobConfiguration configuration = job.getConfiguration(); CalculatedField calculatedField = configuration.getCalculatedField(); diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 727beaf859..6554bebe9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -18,19 +18,22 @@ package org.thingsboard.server.service.job; import jakarta.annotation.PreDestroy; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobStats; +import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.dao.task.JobService; +import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.queue.TbQueueCallback; @@ -64,6 +67,7 @@ public class DefaultJobManager implements JobManager { private final Map jobProcessors; private final Map>> taskProducers; private final QueueConsumerManager> jobStatsConsumer; + private final ExecutorService executor; private final ExecutorService consumerExecutor; @Value("${queue.tasks.stats.processing_interval_ms:5000}") @@ -74,6 +78,7 @@ public class DefaultJobManager implements JobManager { this.jobStatsService = jobStatsService; this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); + this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer")); this.jobStatsConsumer = QueueConsumerManager.>builder() .name("job-stats") @@ -92,22 +97,40 @@ public class DefaultJobManager implements JobManager { @Override public Job submitJob(Job job) { - job = jobService.createJob(job.getTenantId(), job); - log.info("Submitting job: {}", job); + log.debug("Submitting job: {}", job); + return jobService.createJob(job.getTenantId(), job); + } - int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask); - jobStatsService.reportAllTasksSubmitted(job.getTenantId(), job.getId(), tasksCount); - return job; + @Override + public void onJobUpdate(Job job) { + if (job.getStatus() == JobStatus.PENDING) { + executor.execute(() -> { + TenantId tenantId = job.getTenantId(); + JobId jobId = job.getId(); + try { + int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted + log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount); + jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount); + } catch (Throwable e) { + log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e); + try { + jobService.markAsFailed(tenantId, jobId, ExceptionUtils.getStackTrace(e)); + } catch (Throwable e2) { + log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2); + } + } + }); + } } @Override public void cancelJob(TenantId tenantId, JobId jobId) { - log.info("Cancelling job: {}", jobId); + log.info("[{}][{}] Cancelling job", tenantId, jobId); jobService.cancelJob(tenantId, jobId); } private void submitTask(Task task) { - log.info("Submitting task: {}", task); + log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task); TaskProto taskProto = TaskProto.newBuilder() .setValue(JacksonUtil.toString(task)) .build(); @@ -147,22 +170,23 @@ public class DefaultJobManager implements JobManager { } stats.forEach((jobId, jobStats) -> { + TenantId tenantId = jobStats.getTenantId(); try { - TenantId tenantId = jobStats.getTenantId(); - log.info("[{}][{}] Processing job stats: {}", tenantId, jobId, stats); + log.debug("[{}][{}] Processing job stats: {}", tenantId, jobId, stats); jobService.processStats(tenantId, jobId, jobStats); } catch (Exception e) { - log.warn("Failed to process job stats for {}: {}", jobId, jobStats, e); + log.error("[{}][{}] Failed to process job stats: {}", tenantId, jobId, jobStats, e); } }); consumer.commit(); - Thread.sleep(statsProcessingInterval); + Thread.sleep(statsProcessingInterval); // todo: test with bigger interval } @PreDestroy private void destroy() { jobStatsConsumer.stop(); + executor.shutdownNow(); consumerExecutor.shutdownNow(); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java index bed8f3f25e..cda7201e1d 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java @@ -28,11 +28,18 @@ import java.util.function.Consumer; @Component @RequiredArgsConstructor -public class DummyJobProcessor extends JobProcessor { +public class DummyJobProcessor implements JobProcessor { @Override - public int process(Job job, Consumer taskConsumer) { + public int process(Job job, Consumer taskConsumer) throws Exception { DummyJobConfiguration configuration = job.getConfiguration(); + if (configuration.getGeneralError() != null) { + for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) { + taskConsumer.accept(createTask(job, configuration, number, null)); + } + Thread.sleep(configuration.getTaskProcessingTimeMs() * (configuration.getSubmittedTasksBeforeGeneralError() / 2)); // sleeping so that some tasks are processed + throw new RuntimeException(configuration.getGeneralError()); + } for (int number = 1; number <= configuration.getSuccessfulTasksCount(); number++) { taskConsumer.accept(createTask(job, configuration, number, null)); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobManager.java b/application/src/main/java/org/thingsboard/server/service/job/JobManager.java index 71ff3dcaa2..3932361f3f 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobManager.java @@ -25,4 +25,6 @@ public interface JobManager { void cancelJob(TenantId tenantId, JobId jobId); + void onJobUpdate(Job job); + } diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java index 01f7291dd3..2431134e99 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java @@ -15,16 +15,16 @@ */ package org.thingsboard.server.service.job; -import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.common.data.job.Task; import java.util.function.Consumer; -public abstract class JobProcessor { +public interface JobProcessor { - public abstract int process(Job job, Consumer taskConsumer); + int process(Job job, Consumer taskConsumer) throws Exception; - public abstract JobType getType(); + JobType getType(); } diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index b01d0099df..173a528e14 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -33,12 +33,14 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.controller.AbstractControllerTest; +import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.service.DaoSqlTest; -import org.thingsboard.server.dao.task.JobService; import org.thingsboard.server.queue.task.JobStatsService; import org.thingsboard.server.service.job.task.DummyTaskProcessor; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -157,7 +159,7 @@ public class JobManagerTest extends AbstractControllerTest { Job job = findJobById(jobId); assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); - assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1); + assertThat(job.getResult().getDiscardedCount()).isBetween(1, tasksCount - 1); assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); }); @@ -184,15 +186,14 @@ public class JobManagerTest extends AbstractControllerTest { inv.callRealMethod(); } return null; - }).when(taskProcessor).addToCancelledJobs(any()); // ignoring cancellation event, + }).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event, jobManager.cancelJob(tenantId, jobId); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { Job job = findJobById(jobId); - System.err.println(job); assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); - assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1); + assertThat(job.getResult().getDiscardedCount()).isBetween(1, tasksCount - 1); assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); }); @@ -224,12 +225,118 @@ public class JobManagerTest extends AbstractControllerTest { Assertions.assertThat(jobService.findJobsByTenantId(tenantId, new PageLink(100, 0)).getData()).isEmpty(); } + @Test + public void testSubmitMultipleJobs() { + int tasksCount = 3; + int jobsCount = 3; + for (int i = 1; i <= jobsCount; i++) { + Job job = Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job-" + i) + .description("test job") + .configuration(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(1000) + .build()) + .build(); + jobManager.submitJob(job); + } + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + List jobs = findJobs(); + assertThat(jobs).hasSize(jobsCount); + Job firstJob = jobs.get(2); // ordered by createdTime descending + assertThat(firstJob.getStatus()).isEqualTo(JobStatus.RUNNING); + Job secondJob = jobs.get(1); + assertThat(secondJob.getStatus()).isEqualTo(JobStatus.QUEUED); + Job thirdJob = jobs.get(0); + assertThat(thirdJob.getStatus()).isEqualTo(JobStatus.QUEUED); + }); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + List jobs = findJobs(); + for (Job job : jobs) { + assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); + assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount); + assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); + } + }); + } + + @Test + public void testCancelQueuedJob() { + int tasksCount = 3; + int jobsCount = 3; + List jobIds = new ArrayList<>(); + for (int i = 1; i <= jobsCount; i++) { + Job job = Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job-" + i) + .description("test job") + .configuration(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(1000) + .build()) + .build(); + jobIds.add(jobManager.submitJob(job).getId()); + } + + for (int i = 1; i < jobIds.size(); i++) { + jobManager.cancelJob(tenantId, jobIds.get(i)); + } + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + List jobs = findJobs(); + + Job firstJob = jobs.get(2); + assertThat(firstJob.getStatus()).isEqualTo(JobStatus.COMPLETED); + assertThat(firstJob.getResult().getSuccessfulCount()).isEqualTo(tasksCount); + assertThat(firstJob.getResult().getTotalCount()).isEqualTo(tasksCount); + + Job secondJob = jobs.get(1); + assertThat(secondJob.getStatus()).isEqualTo(JobStatus.CANCELLED); + assertThat(secondJob.getResult().getCompletedCount()).isZero(); + + Job thirdJob = jobs.get(0); + assertThat(thirdJob.getStatus()).isEqualTo(JobStatus.CANCELLED); + assertThat(thirdJob.getResult().getCompletedCount()).isZero(); + }); + } + + @Test + public void testGeneralJobError() { + int submittedTasks = 100; + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("test job") + .configuration(DummyJobConfiguration.builder() + .generalError("Some error while submitting tasks") + .submittedTasksBeforeGeneralError(submittedTasks) + .taskProcessingTimeMs(10) + .build()) + .build()).getId(); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + assertThat(job.getResult().getSuccessfulCount()).isBetween(1, submittedTasks); + assertThat(job.getResult().getDiscardedCount()).isBetween(1, submittedTasks); + assertThat(job.getResult().getTotalCount()).isNull(); + }); + } + + // todo: job with zero tasks, reprocessing + private Job findJobById(JobId jobId) throws Exception { return doGet("/api/job/" + jobId, Job.class); } private List findJobs() throws Exception { - return doGetTypedWithPageLink("/api/jobs?", new TypeReference>() {}, new PageLink(100, 0)).getData(); + return doGetTypedWithPageLink("/api/jobs?", new TypeReference>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData(); } } \ No newline at end of file diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index a0efcf52c1..13c93da411 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -9,7 +9,7 @@ - + diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java similarity index 92% rename from common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java rename to common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java index 9ce802b84f..62fd60eaac 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.task; +package org.thingsboard.server.dao.job; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; @@ -31,6 +31,8 @@ public interface JobService extends EntityDaoService { void cancelJob(TenantId tenantId, JobId jobId); + void markAsFailed(TenantId tenantId, JobId jobId, String error); + void processStats(TenantId tenantId, JobId jobId, JobStats jobStats); PageData findJobsByTenantId(TenantId tenantId, PageLink pageLink); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java index 5c380c4dfa..8846a6a12f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import lombok.experimental.SuperBuilder; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.id.EntityId; @@ -26,6 +27,7 @@ import org.thingsboard.server.common.data.id.EntityId; @NoArgsConstructor @EqualsAndHashCode(callSuper = true) @SuperBuilder +@ToString(callSuper = true) public class CfReprocessingTask extends Task { private CalculatedField calculatedField; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java index 7fe621c058..70daf6f9c9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java @@ -34,6 +34,9 @@ public class DummyJobConfiguration implements JobConfiguration { private List errors; private int retries; + private String generalError; + private int submittedTasksBeforeGeneralError; + @Override public JobType getType() { return JobType.DUMMY; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java index dee97fc3c9..ae93ed06bb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import lombok.experimental.SuperBuilder; import java.util.List; @@ -26,6 +27,7 @@ import java.util.List; @NoArgsConstructor @EqualsAndHashCode(callSuper = true) @SuperBuilder +@ToString(callSuper = true) public class DummyTask extends Task { private int number; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java index 5a51a49239..e96d42cad1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java @@ -21,6 +21,7 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import org.thingsboard.server.common.data.BaseData; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.JobId; @@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.id.TenantId; @Data @NoArgsConstructor +@ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) public class Job extends BaseData implements HasTenantId { @@ -51,7 +53,6 @@ public class Job extends BaseData implements HasTenantId { this.key = key; this.description = description; this.configuration = configuration; - this.status = JobStatus.PENDING; this.result = switch (type) { case CF_REPROCESSING -> new CfReprocessingJobResult(); case DUMMY -> new DummyJobResult(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index abaa86facb..b517877906 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -39,15 +39,16 @@ public abstract class JobResult implements Serializable { private int successfulCount; private int failedCount; - private int cancelledCount; + private int discardedCount; private Integer totalCount = null; // set when all tasks are submitted private Map failures = new HashMap<>(); + private String generalError; private long cancellationTs; @JsonIgnore public int getCompletedCount() { - return successfulCount + failedCount + cancelledCount; + return successfulCount + failedCount + discardedCount; } public abstract JobType getJobType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java index 026e19c5b2..17d050a540 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java @@ -16,9 +16,23 @@ package org.thingsboard.server.common.data.job; public enum JobStatus { + QUEUED, PENDING, RUNNING, COMPLETED, FAILED, - CANCELLED + CANCELLED; + + public boolean isOneOf(JobStatus... statuses) { + if (statuses == null) { + return false; + } + for (JobStatus status : statuses) { + if (this == status) { + return true; + } + } + return false; + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java index 57f2f44d7a..468173bdf3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java @@ -27,7 +27,7 @@ import lombok.NoArgsConstructor; public class TaskResult { private boolean success; - private boolean cancelled; + private boolean discarded; private TaskFailure failure; @Data diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java index ceba2645f9..8d69f5781b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java @@ -52,7 +52,7 @@ public class JobStatsService { } private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) { - log.info("[{}] Reporting: {}", jobId, statsMsg); + log.debug("[{}] Reporting: {}", jobId, statsMsg); statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setJobIdMSB(jobId.getId().getMostSignificantBits()) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 2cf0f22b90..f685373f75 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -57,7 +57,7 @@ public abstract class TaskProcessor { private ExecutorService consumerExecutor; private final Set deletedTenants = ConcurrentHashMap.newKeySet(); - private final Set cancelledJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine + private final Set discardedJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine @PostConstruct public void init() { @@ -83,14 +83,14 @@ public abstract class TaskProcessor { switch (entityId.getEntityType()) { case JOB -> { if (event.getEvent() == ComponentLifecycleEvent.STOPPED) { - log.info("Adding job {} to cancelledJobs", entityId); - addToCancelledJobs(entityId.getId()); + log.debug("Adding job {} to discarded", entityId); + addToDiscardedJobs(entityId.getId()); } } case TENANT -> { if (event.getEvent() == ComponentLifecycleEvent.DELETED) { deletedTenants.add(entityId.getId()); - log.info("Adding tenant {} to deletedTenants", entityId); + log.debug("Adding tenant {} to deleted", entityId); } } } @@ -100,7 +100,7 @@ public abstract class TaskProcessor { for (TbProtoQueueMsg msg : msgs) { try { Task task = JacksonUtil.fromString(msg.getValue().getValue(), Task.class); - if (cancelledJobs.contains(task.getJobId().getId())) { + if (discardedJobs.contains(task.getJobId().getId())) { log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId()); reportCancelled(task); continue; @@ -157,13 +157,13 @@ public abstract class TaskProcessor { private void reportCancelled(Task task) { TaskResult result = TaskResult.builder() - .cancelled(true) + .discarded(true) .build(); statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); } - public void addToCancelledJobs(UUID jobId) { - cancelledJobs.add(jobId); + public void addToDiscardedJobs(UUID jobId) { + discardedJobs.add(jobId); } @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java similarity index 54% rename from dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java rename to dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index 16b4397d87..c632a7b58b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.task; +package org.thingsboard.server.dao.job; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -28,17 +28,24 @@ import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStatus; +import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.data.job.TaskResult.TaskFailure; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; -import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.DataValidator; import java.util.Optional; +import static org.thingsboard.server.common.data.job.JobStatus.CANCELLED; +import static org.thingsboard.server.common.data.job.JobStatus.COMPLETED; +import static org.thingsboard.server.common.data.job.JobStatus.FAILED; +import static org.thingsboard.server.common.data.job.JobStatus.PENDING; +import static org.thingsboard.server.common.data.job.JobStatus.QUEUED; +import static org.thingsboard.server.common.data.job.JobStatus.RUNNING; + @Service @RequiredArgsConstructor @Slf4j @@ -47,10 +54,16 @@ public class DefaultJobService extends AbstractEntityService implements JobServi private final JobDao jobDao; private final JobValidator validator = new JobValidator(); + @Transactional @Override public Job createJob(TenantId tenantId, Job job) { validator.validate(job, Job::getTenantId); - return saveJob(tenantId, job, false); + if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) { + job.setStatus(QUEUED); + } else { + job.setStatus(PENDING); + } + return saveJob(tenantId, job, true, null); } @Override @@ -62,11 +75,27 @@ public class DefaultJobService extends AbstractEntityService implements JobServi @Override public void cancelJob(TenantId tenantId, JobId jobId) { Job job = findForUpdate(tenantId, jobId); - if (job.getStatus() != JobStatus.PENDING && job.getStatus() != JobStatus.RUNNING) { + if (!job.getStatus().isOneOf(QUEUED, PENDING, RUNNING)) { throw new IllegalArgumentException("Job already " + job.getStatus().name().toLowerCase()); } job.getResult().setCancellationTs(System.currentTimeMillis()); - saveJob(tenantId, job, true); + JobStatus prevStatus = job.getStatus(); + if (job.getStatus() == QUEUED) { + job.setStatus(CANCELLED); // setting cancelled status right away, because we don't expect stats for cancelled tasks + } else if (job.getStatus() == PENDING) { + job.setStatus(RUNNING); + } + saveJob(tenantId, job, true, prevStatus); + } + + @Transactional + @Override + public void markAsFailed(TenantId tenantId, JobId jobId, String error) { + Job job = findForUpdate(tenantId, jobId); + job.getResult().setGeneralError(error); + JobStatus prevStatus = job.getStatus(); + job.setStatus(FAILED); + saveJob(tenantId, job, true, prevStatus); } @Transactional @@ -74,39 +103,36 @@ public class DefaultJobService extends AbstractEntityService implements JobServi public void processStats(TenantId tenantId, JobId jobId, JobStats jobStats) { Job job = findForUpdate(tenantId, jobId); if (job == null) { - log.info("Got stale stats for job {}: {}", jobId, jobStats); + log.debug("[{}][{}] Got stale stats: {}", tenantId, jobId, jobStats); return; } - switch (job.getStatus()) { - case PENDING -> { - job.setStatus(JobStatus.RUNNING); - } - case CANCELLED, COMPLETED, FAILED -> { - // got some stale stats - return; - } + JobStatus prevStatus = job.getStatus(); + if (job.getStatus() == PENDING) { + job.setStatus(RUNNING); } - JobResult jobResult = job.getResult(); + JobResult result = job.getResult(); if (jobStats.getTotalTasksCount() != null) { - jobResult.setTotalCount(jobStats.getTotalTasksCount()); + result.setTotalCount(jobStats.getTotalTasksCount()); } boolean publishEvent = false; for (TaskResult taskResult : jobStats.getTaskResults()) { if (taskResult.isSuccess()) { - jobResult.setSuccessfulCount(jobResult.getSuccessfulCount() + 1); - } else if (taskResult.isCancelled()) { - jobResult.setCancelledCount(jobResult.getCancelledCount() + 1); + result.setSuccessfulCount(result.getSuccessfulCount() + 1); + } else if (taskResult.isDiscarded()) { + result.setDiscardedCount(result.getDiscardedCount() + 1); } else { TaskFailure failure = taskResult.getFailure(); String key = failure.getTask().getKey(); - jobResult.setFailedCount(jobResult.getFailedCount() + 1); - jobResult.getFailures().put(key, failure.getError()); + result.setFailedCount(result.getFailedCount() + 1); + if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures + result.getFailures().put(key, failure.getError()); + } } - if (jobResult.getCancellationTs() > 0) { - if (!taskResult.isCancelled() && System.currentTimeMillis() > jobResult.getCancellationTs()) { + if (result.getCancellationTs() > 0) { + if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) { log.info("Got task result for cancelled job {}: {}, re-notifying processors about cancellation", jobId, taskResult); // task processor forgot the task is cancelled publishEvent = true; @@ -114,32 +140,49 @@ public class DefaultJobService extends AbstractEntityService implements JobServi } } - if (jobResult.getTotalCount() != null && jobResult.getCompletedCount() >= jobResult.getTotalCount()) { - if (jobResult.getCancellationTs() > 0) { - job.setStatus(JobStatus.CANCELLED); - } else if (jobResult.getFailedCount() > 0) { - job.setStatus(JobStatus.FAILED); - } else { - job.setStatus(JobStatus.COMPLETED); + if (job.getStatus() == RUNNING) { + if (result.getTotalCount() != null && result.getCompletedCount() >= result.getTotalCount()) { + if (result.getCancellationTs() > 0) { + job.setStatus(CANCELLED); + } else if (result.getFailedCount() > 0) { + job.setStatus(FAILED); + } else { + job.setStatus(COMPLETED); + } } } - log.info("Saving job {}", job); - saveJob(tenantId, job, publishEvent); + + saveJob(tenantId, job, publishEvent, prevStatus); } - private Job saveJob(TenantId tenantId, Job job, boolean publishEvent) { + private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) { job = jobDao.save(tenantId, job); if (publishEvent) { eventPublisher.publishEvent(SaveEntityEvent.builder() .tenantId(tenantId) .entityId(job.getId()) .entity(job) - .created(false) .build()); } + log.info("[{}] Saved job: {}", tenantId, job); + if (prevStatus != null && job.getStatus() != prevStatus) { + log.info("[{}][{}][{}] New job status: {} -> {}", tenantId, job.getId(), job.getType(), prevStatus, job.getStatus()); + if (job.getStatus().isOneOf(CANCELLED, COMPLETED, FAILED) && prevStatus != QUEUED) { // if prev status is QUEUED - means there are already running jobs with this type, no need to check for waiting job + checkWaitingJobs(tenantId, job.getType()); + } + } return job; } + private void checkWaitingJobs(TenantId tenantId, JobType jobType) { + Job queuedJob = jobDao.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId, jobType, QUEUED); + if (queuedJob == null) { + return; + } + queuedJob.setStatus(PENDING); + saveJob(tenantId, queuedJob, true, QUEUED); + } + @Override public PageData findJobsByTenantId(TenantId tenantId, PageLink pageLink) { return jobDao.findByTenantId(tenantId, pageLink); @@ -149,15 +192,15 @@ public class DefaultJobService extends AbstractEntityService implements JobServi return jobDao.findByIdForUpdate(tenantId, jobId); } - // todo: cancellation, reprocessing +// todo: reprocessing public class JobValidator extends DataValidator { @Override protected void validateCreate(TenantId tenantId, Job job) { - if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), JobStatus.PENDING, JobStatus.RUNNING)) { - throw new DataValidationException("Job of this type is already running"); - } +// if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) { +// throw new DataValidationException("Job of this type is already running"); +// } } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java b/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java similarity index 90% rename from dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java rename to dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java index 3b1d6631dd..799717fea8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.task; +package org.thingsboard.server.dao.job; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; @@ -34,4 +34,6 @@ public interface JobDao extends Dao { boolean existsByTenantIdAndTypeAndStatusOneOf(TenantId tenantId, JobType type, JobStatus... statuses); + Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java similarity index 56% rename from dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java rename to dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java index 472df05bdd..bec5bf5f87 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java @@ -13,15 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sql.task; +package org.thingsboard.server.dao.sql.job; import jakarta.persistence.LockModeType; -import jakarta.transaction.Transactional; +import org.springframework.data.domain.Limit; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Lock; -import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; @@ -36,7 +35,7 @@ import java.util.UUID; public interface JobRepository extends JpaRepository { @Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId " + - "AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true " + + "AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true " + "OR ilike(j.description, concat('%', :searchText, '%')) = true)") Page findByTenantIdAndSearchText(@Param("tenantId") UUID tenantId, @Param("searchText") String searchText, @@ -46,45 +45,13 @@ public interface JobRepository extends JpaRepository { @Query("SELECT j FROM JobEntity j WHERE j.id = :id") JobEntity findByIdForUpdate(UUID id); - @Modifying - @Transactional - @Query(value = """ - UPDATE job - SET result = jsonb_set( - result, - '{successfulCount}', - to_jsonb((result->>'successfulCount')::int + :count) - ) - WHERE id = :jobId - RETURNING ((result->>'successfulCount')::int + :count) - + (result->>'failedCount')::int = (result->>'totalCount')::int - """, nativeQuery = true) - boolean reportTaskSuccess(@Param("jobId") UUID jobId, - @Param("count") int count); - - @Modifying - @Transactional - @Query(value = """ - UPDATE job - SET result = jsonb_set( - jsonb_set( - result, - '{failedCount}', - to_jsonb((result->>'failedCount')::int + 1) - ), - ARRAY['failures', :taskKey], - to_jsonb(:error) - ) - WHERE id = :jobId - RETURNING ((result->>'failedCount')::int + 1) + (result->>'successfulCount')::int - = (result->>'totalCount')::int - """, nativeQuery = true) - boolean reportTaskFailure(@Param("jobId") UUID jobId, - @Param("taskKey") String taskKey, - @Param("error") String error); - boolean existsByKeyAndStatusIn(String key, List statuses); boolean existsByTenantIdAndTypeAndStatusIn(UUID tenantId, JobType type, List statuses); + @Lock(LockModeType.PESSIMISTIC_WRITE) // SELECT FOR UPDATE + @Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId AND j.type = :type " + + "AND j.status = :status ORDER BY j.createdTime ASC, j.id ASC") + JobEntity findOldestByTenantIdAndTypeAndStatusForUpdate(UUID tenantId, JobType type, JobStatus status, Limit limit); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java similarity index 87% rename from dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java rename to dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java index 92b9fc8a72..1b3a394e28 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.sql.task; +package org.thingsboard.server.dao.sql.job; import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.Limit; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntityType; @@ -30,7 +31,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.JobEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; -import org.thingsboard.server.dao.task.JobDao; +import org.thingsboard.server.dao.job.JobDao; import org.thingsboard.server.dao.util.SqlDao; import java.util.Arrays; @@ -63,6 +64,11 @@ public class JpaJobDao extends JpaAbstractDao implements JobDao return jobRepository.existsByTenantIdAndTypeAndStatusIn(tenantId.getId(), type, Arrays.stream(statuses).toList()); } + @Override + public Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status) { + return DaoUtil.getData(jobRepository.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId.getId(), type, status, Limit.of(1))); + } + @Override public EntityType getEntityType() { return EntityType.JOB;