diff --git a/application/src/main/java/org/thingsboard/server/controller/JobController.java b/application/src/main/java/org/thingsboard/server/controller/JobController.java index 5718d6e388..9b6627e12a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/JobController.java +++ b/application/src/main/java/org/thingsboard/server/controller/JobController.java @@ -82,4 +82,11 @@ public class JobController extends BaseController { jobManager.cancelJob(getTenantId(), new JobId(id)); } + @PostMapping("/job/{id}/reprocess") + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')") + public void reprocessJob(@PathVariable UUID id) throws ThingsboardException { + // todo check permissions + jobManager.reprocessJob(getTenantId(), new JobId(id)); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java index edb38da5c2..79a735f6a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java @@ -24,19 +24,24 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.job.CfReprocessingJobConfiguration; import org.thingsboard.server.common.data.job.CfReprocessingTask; +import org.thingsboard.server.common.data.job.CfReprocessingTask.CfReprocessingTaskFailure; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; +import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.device.DeviceService; +import java.util.List; import java.util.function.Consumer; @Component @RequiredArgsConstructor public class CfReprocessingJobProcessor implements JobProcessor { + private final CalculatedFieldService calculatedFieldService; private final DeviceService deviceService; private final AssetService assetService; @@ -44,12 +49,12 @@ public class CfReprocessingJobProcessor implements JobProcessor { public int process(Job job, Consumer taskConsumer) throws Exception { CfReprocessingJobConfiguration configuration = job.getConfiguration(); - CalculatedField calculatedField = configuration.getCalculatedField(); + CalculatedField calculatedField = calculatedFieldService.findById(job.getTenantId(), configuration.getCalculatedFieldId()); EntityId cfEntityId = calculatedField.getEntityId(); int tasksCount = 0; if (cfEntityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) { - taskConsumer.accept(createTask(job, configuration, cfEntityId)); + taskConsumer.accept(createTask(job, configuration, calculatedField, cfEntityId)); tasksCount++; } else { PageDataIterable entities; @@ -61,20 +66,31 @@ public class CfReprocessingJobProcessor implements JobProcessor { throw new IllegalArgumentException("Unsupported CF entity type " + cfEntityId.getEntityType()); } for (EntityId entityId : entities) { - taskConsumer.accept(createTask(job, configuration, entityId)); + taskConsumer.accept(createTask(job, configuration, calculatedField, entityId)); tasksCount++; } } return tasksCount; } - private Task createTask(Job job, CfReprocessingJobConfiguration configuration, EntityId entityId) { + @Override + public void reprocess(Job job, List failures, Consumer taskConsumer) throws Exception { + CfReprocessingJobConfiguration configuration = job.getConfiguration(); + CalculatedField calculatedField = calculatedFieldService.findById(job.getTenantId(), configuration.getCalculatedFieldId()); + + for (TaskFailure failure : failures) { + CfReprocessingTaskFailure taskFailure = (CfReprocessingTaskFailure) failure; + EntityId entityId = taskFailure.getEntityId(); + taskConsumer.accept(createTask(job, job.getConfiguration(), calculatedField, entityId)); + } + } + + private Task createTask(Job job, CfReprocessingJobConfiguration configuration, CalculatedField calculatedField, EntityId entityId) { return CfReprocessingTask.builder() .tenantId(job.getTenantId()) .jobId(job.getId()) - .key(entityId.getEntityType().getNormalName() + " " + entityId.getId()) .retries(2) // 3 attempts in total - .calculatedField(configuration.getCalculatedField()) + .calculatedField(calculatedField) .entityId(entityId) .startTs(configuration.getStartTs()) .endTs(configuration.getEndTs()) diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 6554bebe9f..b72974144d 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -27,10 +27,12 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; +import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; +import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.job.TaskResult; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.job.JobService; @@ -48,6 +50,7 @@ import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -98,37 +101,73 @@ public class DefaultJobManager implements JobManager { @Override public Job submitJob(Job job) { log.debug("Submitting job: {}", job); - return jobService.createJob(job.getTenantId(), job); + return jobService.submitJob(job.getTenantId(), job); } @Override public void onJobUpdate(Job job) { if (job.getStatus() == JobStatus.PENDING) { executor.execute(() -> { - TenantId tenantId = job.getTenantId(); - JobId jobId = job.getId(); - try { - int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted - log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount); - jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount); - } catch (Throwable e) { - log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e); - try { - jobService.markAsFailed(tenantId, jobId, ExceptionUtils.getStackTrace(e)); - } catch (Throwable e2) { - log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2); - } - } + processJob(job); }); } } + private void processJob(Job job) { + TenantId tenantId = job.getTenantId(); + JobId jobId = job.getId(); + try { + JobProcessor processor = jobProcessors.get(job.getType()); + List toReprocess = job.getConfiguration().getToReprocess(); + if (toReprocess == null) { + int tasksCount = processor.process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted + log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount); + jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount); + } else { + processor.reprocess(job, toReprocess, this::submitTask); + log.info("[{}][{}][{}] Submitted {} tasks for reprocessing", tenantId, jobId, job.getType(), toReprocess.size()); + } + } catch (Throwable e) { + log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e); + try { + jobService.markAsFailed(tenantId, jobId, ExceptionUtils.getStackTrace(e)); + } catch (Throwable e2) { + log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2); + } + } + } + @Override public void cancelJob(TenantId tenantId, JobId jobId) { log.info("[{}][{}] Cancelling job", tenantId, jobId); jobService.cancelJob(tenantId, jobId); } + @Override + public void reprocessJob(TenantId tenantId, JobId jobId) { + log.info("[{}][{}] Reprocessing job", tenantId, jobId); + Job job = jobService.findJobById(tenantId, jobId); + if (job.getStatus() != JobStatus.FAILED) { + throw new IllegalArgumentException("Job is not failed"); + } + + JobResult result = job.getResult(); + if (result.getGeneralError() != null) { + throw new IllegalArgumentException("Reprocessing not allowed since job has general error"); + } + List failures = result.getFailures(); + if (result.getFailedCount() > failures.size()) { + throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + failures.size() + ")"); + } + + result.setFailedCount(0); + result.setFailures(Collections.emptyList()); + + job.getConfiguration().setToReprocess(failures); + + jobService.submitJob(tenantId, job); + } + private void submitTask(Task task) { log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task); TaskProto taskProto = TaskProto.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java index cda7201e1d..900b876c09 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java @@ -19,10 +19,13 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.job.DummyJobConfiguration; import org.thingsboard.server.common.data.job.DummyTask; +import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; +import org.thingsboard.server.common.data.job.TaskFailure; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; @@ -35,31 +38,48 @@ public class DummyJobProcessor implements JobProcessor { DummyJobConfiguration configuration = job.getConfiguration(); if (configuration.getGeneralError() != null) { for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) { - taskConsumer.accept(createTask(job, configuration, number, null)); + taskConsumer.accept(createTask(job, configuration, number, null, false)); } Thread.sleep(configuration.getTaskProcessingTimeMs() * (configuration.getSubmittedTasksBeforeGeneralError() / 2)); // sleeping so that some tasks are processed throw new RuntimeException(configuration.getGeneralError()); } - for (int number = 1; number <= configuration.getSuccessfulTasksCount(); number++) { - taskConsumer.accept(createTask(job, configuration, number, null)); + + int taskNumber = 1; + for (int i = 0; i < configuration.getSuccessfulTasksCount(); i++) { + taskConsumer.accept(createTask(job, configuration, taskNumber, null, false)); + taskNumber++; } if (configuration.getErrors() != null) { - for (int number = 1; number <= configuration.getFailedTasksCount(); number++) { - taskConsumer.accept(createTask(job, configuration, number, configuration.getErrors())); + for (int i = 0; i < configuration.getFailedTasksCount(); i++) { + taskConsumer.accept(createTask(job, configuration, taskNumber, configuration.getErrors(), false)); + taskNumber++; + } + for (int i = 0; i < configuration.getPermanentlyFailedTasksCount(); i++) { + taskConsumer.accept(createTask(job, configuration, taskNumber, configuration.getErrors(), true)); + taskNumber++; } } - return configuration.getSuccessfulTasksCount() + configuration.getFailedTasksCount(); + return configuration.getSuccessfulTasksCount() + configuration.getFailedTasksCount() + configuration.getPermanentlyFailedTasksCount(); + } + + @Override + public void reprocess(Job job, List failures, Consumer taskConsumer) throws Exception { + for (TaskFailure failure : failures) { + DummyTaskFailure taskFailure = (DummyTaskFailure) failure; + taskConsumer.accept(createTask(job, job.getConfiguration(), taskFailure.getNumber(), taskFailure.isFailAlways() ? + List.of(taskFailure.getError()) : Collections.emptyList(), taskFailure.isFailAlways())); + } } - private Task createTask(Job job, DummyJobConfiguration configuration, int number, List errors) { + private Task createTask(Job job, DummyJobConfiguration configuration, int number, List errors, boolean failAlways) { return DummyTask.builder() .tenantId(job.getTenantId()) .jobId(job.getId()) - .key("Task " + number) .retries(configuration.getRetries()) .number(number) .processingTimeMs(configuration.getTaskProcessingTimeMs()) .errors(errors) + .failAlways(failAlways) .build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobManager.java b/application/src/main/java/org/thingsboard/server/service/job/JobManager.java index 3932361f3f..8e4858ebe3 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobManager.java @@ -25,6 +25,8 @@ public interface JobManager { void cancelJob(TenantId tenantId, JobId jobId); + void reprocessJob(TenantId tenantId, JobId jobId); + void onJobUpdate(Job job); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java index 2431134e99..da2c75d166 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java @@ -18,13 +18,17 @@ package org.thingsboard.server.service.job; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; +import org.thingsboard.server.common.data.job.TaskFailure; +import java.util.List; import java.util.function.Consumer; public interface JobProcessor { int process(Job job, Consumer taskConsumer) throws Exception; + void reprocess(Job job, List failures, Consumer taskConsumer) throws Exception; + JobType getType(); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index dc4a193bb9..73a27a0012 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -30,6 +30,9 @@ public class DummyTaskProcessor extends TaskProcessor { if (task.getProcessingTimeMs() > 0) { Thread.sleep(task.getProcessingTimeMs()); } + if (task.isFailAlways()) { + throw new RuntimeException(task.getErrors().get(0)); + } if (task.getErrors() != null && task.getAttempt() <= task.getErrors().size()) { String error = task.getErrors().get(task.getAttempt() - 1); throw new RuntimeException(error); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 7e50168786..d580a6b12a 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -1264,4 +1264,12 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return doGetTypedWithPageLink("/api/jobs?", new TypeReference>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData(); } + protected void cancelJob(JobId jobId) throws Exception { + doPost("/api/job/" + jobId + "/cancel").andExpect(status().isOk()); + } + + protected void reprocessJob(JobId jobId) throws Exception { + doPost("/api/job/" + jobId + "/reprocess").andExpect(status().isOk()); + } + } diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index 784b5bb2dd..f9881d0248 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -26,6 +26,7 @@ import org.springframework.test.context.TestPropertySource; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.DummyJobConfiguration; +import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobResult; import org.thingsboard.server.common.data.job.JobStatus; @@ -130,8 +131,8 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks); - assertThat(jobResult.getFailures().get("Task 1")).isEqualTo("error3"); // last error - assertThat(jobResult.getFailures().get("Task 2")).isEqualTo("error3"); // last error + assertThat(jobResult.getFailures().get(0).getError()).isEqualTo("error3"); // last error + assertThat(jobResult.getFailures().get(1).getError()).isEqualTo("error3"); // last error assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); }); } @@ -151,7 +152,7 @@ public class JobManagerTest extends AbstractControllerTest { .build()).getId(); Thread.sleep(500); - jobManager.cancelJob(tenantId, jobId); + cancelJob(jobId); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { Job job = findJobById(jobId); assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); @@ -163,7 +164,7 @@ public class JobManagerTest extends AbstractControllerTest { } @Test - public void testCancelJob_simulateTaskProcessorRestart() { + public void testCancelJob_simulateTaskProcessorRestart() throws Exception { int tasksCount = 10; JobId jobId = jobManager.submitJob(Job.builder() .tenantId(tenantId) @@ -184,7 +185,7 @@ public class JobManagerTest extends AbstractControllerTest { } return null; }).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event, - jobManager.cancelJob(tenantId, jobId); + cancelJob(jobId); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { Job job = findJobById(jobId); @@ -262,7 +263,7 @@ public class JobManagerTest extends AbstractControllerTest { } @Test - public void testCancelQueuedJob() { + public void testCancelQueuedJob() throws Exception { int tasksCount = 3; int jobsCount = 3; List jobIds = new ArrayList<>(); @@ -281,7 +282,7 @@ public class JobManagerTest extends AbstractControllerTest { } for (int i = 1; i < jobIds.size(); i++) { - jobManager.cancelJob(tenantId, jobIds.get(i)); + cancelJob(jobIds.get(i)); } await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -326,6 +327,104 @@ public class JobManagerTest extends AbstractControllerTest { }); } - // todo: job with zero tasks, reprocessing + @Test + public void testJobReprocessing() throws Exception { + int successfulTasks = 3; + int failedTasks = 2; + int totalTasksCount = successfulTasks + failedTasks; + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("test job") + .configuration(DummyJobConfiguration.builder() + .successfulTasksCount(successfulTasks) + .failedTasksCount(failedTasks) + .errors(List.of("error")) + .taskProcessingTimeMs(100) + .build()) + .build()).getId(); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + JobResult jobResult = job.getResult(); + assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); + assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); + + for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { + DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); + assertThat(failure.getNumber()).isEqualTo(taskNumber); + assertThat(failure.getError()).isEqualTo("error"); + } + }); + + reprocessJob(jobId); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); + assertThat(job.getResult().getSuccessfulCount()).isEqualTo(totalTasksCount); + assertThat(job.getResult().getFailedCount()).isZero(); + assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount); + assertThat(job.getResult().getFailures()).isEmpty(); + }); + } + + @Test + public void testJobReprocessing_somePermanentlyFailed() throws Exception { + int successfulTasks = 3; + int failedTasks = 2; + int permanentlyFailedTasks = 1; + int totalTasksCount = successfulTasks + failedTasks + permanentlyFailedTasks; + JobId jobId = jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key("test-job") + .description("test job") + .configuration(DummyJobConfiguration.builder() + .successfulTasksCount(successfulTasks) + .failedTasksCount(failedTasks) + .permanentlyFailedTasksCount(permanentlyFailedTasks) + .errors(List.of("error")) + .taskProcessingTimeMs(100) + .build()) + .build()).getId(); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + JobResult jobResult = job.getResult(); + assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); + assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks + permanentlyFailedTasks); + assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); + + for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { + DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); + assertThat(failure.getNumber()).isEqualTo(taskNumber); + assertThat(failure.getError()).isEqualTo("error"); + } + }); + + reprocessJob(jobId); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); + JobResult jobResult = job.getResult(); + assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks + failedTasks); + assertThat(jobResult.getFailedCount()).isEqualTo(permanentlyFailedTasks); + assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); + + for (int i = 0, taskNumber = successfulTasks + failedTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { + DummyTaskFailure failure = (DummyTaskFailure) jobResult.getFailures().get(i); + assertThat(failure.getNumber()).isEqualTo(taskNumber); + assertThat(failure.getError()).isEqualTo("error"); + assertThat(failure.isFailAlways()).isTrue(); + } + }); + } + + // todo: job with zero tasks } \ No newline at end of file diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java index 62fd60eaac..dd333c072e 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java @@ -25,7 +25,7 @@ import org.thingsboard.server.dao.entity.EntityDaoService; public interface JobService extends EntityDaoService { - Job createJob(TenantId tenantId, Job job); + Job submitJob(TenantId tenantId, Job job); Job findJobById(TenantId tenantId, JobId jobId); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingJobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingJobConfiguration.java index 797dd0b639..90de047554 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingJobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingJobConfiguration.java @@ -19,17 +19,19 @@ import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.id.CalculatedFieldId; @Data +@EqualsAndHashCode(callSuper = true) @AllArgsConstructor @NoArgsConstructor @Builder -public class CfReprocessingJobConfiguration implements JobConfiguration { +public class CfReprocessingJobConfiguration extends JobConfiguration { @NotNull - private CalculatedField calculatedField; + private CalculatedFieldId calculatedFieldId; private long startTs; private long endTs; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java index 8846a6a12f..3c8b765527 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java @@ -35,9 +35,38 @@ public class CfReprocessingTask extends Task { private long startTs; private long endTs; + @Override + public Object getKey() { + return entityId; + } + + @Override + public TaskFailure toFailure(Throwable error) { + return new CfReprocessingTaskFailure(entityId, error.getMessage()); + } + @Override public JobType getJobType() { return JobType.CF_REPROCESSING; } + @Data + @EqualsAndHashCode(callSuper = true) + @NoArgsConstructor + public static class CfReprocessingTaskFailure extends TaskFailure { + + private EntityId entityId; + + public CfReprocessingTaskFailure(EntityId entityId, String error) { + super(error); + this.entityId = entityId; + } + + @Override + public JobType getJobType() { + return JobType.CF_REPROCESSING; + } + + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java index 70daf6f9c9..62695eb237 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java @@ -18,19 +18,22 @@ package org.thingsboard.server.common.data.job; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import java.util.List; @Data +@EqualsAndHashCode(callSuper = true) @AllArgsConstructor @NoArgsConstructor @Builder -public class DummyJobConfiguration implements JobConfiguration { +public class DummyJobConfiguration extends JobConfiguration { private long taskProcessingTimeMs; private int successfulTasksCount; private int failedTasksCount; + private int permanentlyFailedTasksCount; private List errors; private int retries; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java index ae93ed06bb..ac15dc63cc 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java @@ -33,10 +33,42 @@ public class DummyTask extends Task { private int number; private long processingTimeMs; private List errors; // errors for each attempt + private boolean failAlways; + + @Override + public Object getKey() { + return number; + } + + @Override + public TaskFailure toFailure(Throwable error) { + return new DummyTaskFailure(number, failAlways, error.getMessage()); + } @Override public JobType getJobType() { return JobType.DUMMY; } + @Data + @EqualsAndHashCode(callSuper = true) + @NoArgsConstructor + public static class DummyTaskFailure extends TaskFailure { + + private int number; + private boolean failAlways; + + public DummyTaskFailure(int number, boolean failAlways, String error) { + super(error); + this.number = number; + this.failAlways = failAlways; + } + + @Override + public JobType getJobType() { + return JobType.DUMMY; + } + + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java index b541458289..0d3620d9e8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobConfiguration.java @@ -19,8 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; import java.io.Serializable; +import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -28,8 +30,11 @@ import java.io.Serializable; @Type(name = "CF_REPROCESSING", value = CfReprocessingJobConfiguration.class), @Type(name = "DUMMY", value = DummyJobConfiguration.class), }) -public interface JobConfiguration extends Serializable { +@Data +public abstract class JobConfiguration implements Serializable { - JobType getType(); + private List toReprocess; + + public abstract JobType getType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index b517877906..748d24811c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -24,8 +24,8 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") @@ -41,7 +41,7 @@ public abstract class JobResult implements Serializable { private int failedCount; private int discardedCount; private Integer totalCount = null; // set when all tasks are submitted - private Map failures = new HashMap<>(); + private List failures = new ArrayList<>(); private String generalError; private long cancellationTs; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java index 17d050a540..5a4a9e35ee 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.data.job; public enum JobStatus { + QUEUED, PENDING, RUNNING, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java index 5ef735f5d3..6399f6f79a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/Task.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.job; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; @@ -38,7 +39,6 @@ public abstract class Task { private TenantId tenantId; private JobId jobId; - private String key; private int retries; public Task() { @@ -46,6 +46,11 @@ public abstract class Task { private int attempt = 0; + @JsonIgnore + public abstract Object getKey(); + + public abstract TaskFailure toFailure(Throwable error); + public abstract JobType getJobType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskFailure.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskFailure.java new file mode 100644 index 0000000000..1e365c6a8f --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskFailure.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.job; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.job.CfReprocessingTask.CfReprocessingTaskFailure; +import org.thingsboard.server.common.data.job.DummyTask.DummyTaskFailure; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") +@JsonSubTypes({ + @Type(name = "CF_REPROCESSING", value = CfReprocessingTaskFailure.class), + @Type(name = "DUMMY", value = DummyTaskFailure.class) +}) +public abstract class TaskFailure { + + private String error; + + public abstract JobType getJobType(); + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java index 468173bdf3..0ee9a2b477 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java @@ -30,13 +30,4 @@ public class TaskResult { private boolean discarded; private TaskFailure failure; - @Data - @AllArgsConstructor - @NoArgsConstructor - @Builder - public static class TaskFailure { - private String error; - private Task task; - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index f685373f75..f0ba542ab9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.Task; import org.thingsboard.server.common.data.job.TaskResult; -import org.thingsboard.server.common.data.job.TaskResult.TaskFailure; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; @@ -147,10 +146,7 @@ public abstract class TaskProcessor { private void reportFailure(Task task, Throwable error) { TaskResult result = TaskResult.builder() - .failure(TaskFailure.builder() - .error(error.getMessage()) - .task(task) - .build()) + .failure(task.toFailure(error)) .build(); statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index c632a7b58b..f2802f4771 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -30,12 +30,11 @@ import org.thingsboard.server.common.data.job.JobStats; import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.TaskResult; -import org.thingsboard.server.common.data.job.TaskResult.TaskFailure; +import org.thingsboard.server.common.data.job.TaskFailure; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; -import org.thingsboard.server.dao.service.DataValidator; import java.util.Optional; @@ -52,12 +51,13 @@ import static org.thingsboard.server.common.data.job.JobStatus.RUNNING; public class DefaultJobService extends AbstractEntityService implements JobService { private final JobDao jobDao; - private final JobValidator validator = new JobValidator(); @Transactional @Override - public Job createJob(TenantId tenantId, Job job) { - validator.validate(job, Job::getTenantId); + public Job submitJob(TenantId tenantId, Job job) { + if (jobDao.existsByKeyAndStatusOneOf(job.getKey(), QUEUED, PENDING, RUNNING)) { + throw new IllegalArgumentException("The same job is already queued or running"); + } if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) { job.setStatus(QUEUED); } else { @@ -124,10 +124,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi result.setDiscardedCount(result.getDiscardedCount() + 1); } else { TaskFailure failure = taskResult.getFailure(); - String key = failure.getTask().getKey(); result.setFailedCount(result.getFailedCount() + 1); if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures - result.getFailures().put(key, failure.getError()); + result.getFailures().add(failure); } } @@ -192,24 +191,6 @@ public class DefaultJobService extends AbstractEntityService implements JobServi return jobDao.findByIdForUpdate(tenantId, jobId); } -// todo: reprocessing - - public class JobValidator extends DataValidator { - - @Override - protected void validateCreate(TenantId tenantId, Job job) { -// if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) { -// throw new DataValidationException("Job of this type is already running"); -// } - } - - @Override - protected Job validateUpdate(TenantId tenantId, Job job) { - throw new IllegalArgumentException("Job can't be updated externally"); - } - - } - @Override public Optional> findEntity(TenantId tenantId, EntityId entityId) { return Optional.ofNullable(findJobById(tenantId, (JobId) entityId));