diff --git a/application/src/main/java/org/thingsboard/server/controller/JobController.java b/application/src/main/java/org/thingsboard/server/controller/JobController.java index 9719306823..55c68a461f 100644 --- a/application/src/main/java/org/thingsboard/server/controller/JobController.java +++ b/application/src/main/java/org/thingsboard/server/controller/JobController.java @@ -26,6 +26,7 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.thingsboard.rule.engine.api.JobManager; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.job.Job; @@ -36,7 +37,6 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.rule.engine.api.JobManager; import java.util.List; import java.util.UUID; @@ -76,12 +76,18 @@ public class JobController extends BaseController { @Parameter(description = SORT_ORDER_DESCRIPTION) @RequestParam(required = false) String sortOrder, @RequestParam(required = false) List types, - @RequestParam(required = false) List statuses) throws ThingsboardException { + @RequestParam(required = false) List statuses, + @RequestParam(required = false) List entities, + @RequestParam(required = false) Long startTime, + @RequestParam(required = false) Long endTime) throws ThingsboardException { // todo check permissions PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); JobFilter filter = JobFilter.builder() .types(types) .statuses(statuses) + .entities(entities) + .startTime(startTime) + .endTime(endTime) .build(); return jobService.findJobsByFilter(getTenantId(), filter, pageLink); } diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/JobsDeletionTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/JobsDeletionTaskProcessor.java new file mode 100644 index 0000000000..f0f829770a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/processor/JobsDeletionTaskProcessor.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.housekeeper.processor; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; +import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType; +import org.thingsboard.server.dao.job.JobService; + +@Component +@RequiredArgsConstructor +@Slf4j +public class JobsDeletionTaskProcessor extends HousekeeperTaskProcessor { + + private final JobService jobService; + + @Override + public void process(HousekeeperTask task) throws Exception { + int deletedCount = jobService.deleteJobsByEntityId(task.getTenantId(), task.getEntityId()); + log.debug("[{}][{}][{}] Deleted {} jobs", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), deletedCount); + } + + @Override + public HousekeeperTaskType getTaskType() { + return HousekeeperTaskType.DELETE_JOBS; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java index 314581acef..379e72d28b 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java @@ -22,7 +22,6 @@ import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.JobManager; -import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; @@ -32,13 +31,9 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.common.data.job.task.TaskResult; -import org.thingsboard.server.common.data.notification.info.GeneralNotificationInfo; -import org.thingsboard.server.common.data.notification.targets.platform.TenantAdministratorsFilter; -import org.thingsboard.server.common.data.notification.template.NotificationTemplate; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.job.JobService; -import org.thingsboard.server.dao.notification.DefaultNotifications; import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; @@ -63,19 +58,17 @@ public class DefaultJobManager implements JobManager { private final JobService jobService; private final JobStatsService jobStatsService; - private final NotificationCenter notificationCenter; private final PartitionService partitionService; private final TasksQueueConfig queueConfig; private final Map jobProcessors; private final Map>> taskProducers; private final ExecutorService executor; - public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter, - PartitionService partitionService, TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig, + public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, PartitionService partitionService, + TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig, List jobProcessors) { this.jobService = jobService; this.jobStatsService = jobStatsService; - this.notificationCenter = notificationCenter; this.partitionService = partitionService; this.queueConfig = queueConfig; this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); @@ -105,10 +98,7 @@ public class DefaultJobManager implements JobManager { case COMPLETED, FAILED -> { executor.execute(() -> { try { - if (status == JobStatus.COMPLETED) { - getJobProcessor(job.getType()).onJobCompleted(job); - } - sendJobFinishedNotification(job); + getJobProcessor(job.getType()).onJobFinished(job); } catch (Throwable e) { log.error("Failed to process job update: {}", job, e); } @@ -203,22 +193,6 @@ public class DefaultJobManager implements JobManager { }); } - private void sendJobFinishedNotification(Job job) { - NotificationTemplate template = DefaultNotifications.DefaultNotification.builder() - .name("Job finished") - .subject("${type} task ${status}") - .text("${description} ${status}: ${result}") - .build().toTemplate(); - GeneralNotificationInfo info = new GeneralNotificationInfo(Map.of( - "type", job.getType().getTitle(), - "description", job.getDescription(), - "status", job.getStatus().name().toLowerCase(), - "result", job.getResult().getDescription() - )); - // todo: button to see details (forward to jobs page) - notificationCenter.sendGeneralWebNotification(job.getTenantId(), new TenantAdministratorsFilter(), template, info); - } - private JobProcessor getJobProcessor(JobType jobType) { return jobProcessors.get(jobType); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java index 16ef5e404f..e33878d008 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java @@ -29,7 +29,7 @@ public interface JobProcessor { void reprocess(Job job, List taskFailures, Consumer> taskConsumer) throws Exception; - default void onJobCompleted(Job job) {} + default void onJobFinished(Job job) {} JobType getType(); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index ef658fc17a..b93ff0f623 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -1272,8 +1272,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return doGetTypedWithPageLink("/api/jobs?", new TypeReference>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData(); } - protected List findJobs(JobType... types) throws Exception { - return doGetTypedWithPageLink("/api/jobs?types=" + Arrays.stream(types).map(Enum::name).collect(Collectors.joining(",")) + "&", + protected List findJobs(List types, List entities) throws Exception { + return doGetTypedWithPageLink("/api/jobs?types=" + types.stream().map(Enum::name).collect(Collectors.joining(",")) + + "&entities=" + entities.stream().map(UUID::toString).collect(Collectors.joining(",")) + "&", new TypeReference>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData(); } diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index dd19ddcea1..14c0e4e847 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.job; -import org.assertj.core.api.ThrowingConsumer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -24,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.thingsboard.rule.engine.api.JobManager; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.DummyJobConfiguration; @@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.DummyTaskResult; import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure; -import org.thingsboard.server.common.data.notification.Notification; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.job.JobDao; @@ -53,6 +52,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @DaoSqlTest @TestPropertySource(properties = { @@ -72,9 +72,14 @@ public class JobManagerTest extends AbstractControllerTest { @Autowired private JobDao jobDao; + private TenantId tenantId; + private Device jobEntity; + @Before public void setUp() throws Exception { loginTenantAdmin(); + tenantId = super.tenantId; + jobEntity = createDevice("Test", "Test"); } @After @@ -84,15 +89,9 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testSubmitJob_allTasksSuccessful() { int tasksCount = 5; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("Test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(1000) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(1000) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -108,29 +107,18 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getResults()).isEmpty(); assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); }); - - checkJobNotification(notification -> { - assertThat(notification.getSubject()).isEqualTo("Dummy job task completed"); - assertThat(notification.getText()).isEqualTo("Test job completed: 5/5 successful, 0 failed"); - }); } @Test public void testSubmitJob_someTasksPermanentlyFailed() { int successfulTasks = 3; int failedTasks = 2; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("Test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(successfulTasks) - .failedTasksCount(failedTasks) - .errors(List.of("error1", "error2", "error3")) - .retries(2) - .taskProcessingTimeMs(100) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(successfulTasks) + .failedTasksCount(failedTasks) + .errors(List.of("error1", "error2", "error3")) + .retries(2) + .taskProcessingTimeMs(100) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -145,24 +133,13 @@ public class JobManagerTest extends AbstractControllerTest { }); assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); }); - - checkJobNotification(notification -> { - assertThat(notification.getSubject()).isEqualTo("Dummy job task failed"); - assertThat(notification.getText()).isEqualTo("Test job failed: 3/5 successful, 2 failed"); - }); } @Test public void testSubmitJob_taskTimeout() { - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("Test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(1) - .taskProcessingTimeMs(5000) // bigger than DummyTaskProcessor.getTaskProcessingTimeout() - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(1) + .taskProcessingTimeMs(5000) // bigger than DummyTaskProcessor.getTaskProcessingTimeout() .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -177,15 +154,9 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testCancelJob_whileRunning() throws Exception { int tasksCount = 100; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(100) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(100) .build()).getId(); Thread.sleep(500); @@ -203,15 +174,9 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testCancelJob_simulateTaskProcessorRestart() throws Exception { int tasksCount = 10; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(500) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(500) .build()).getId(); // simulate cancelled jobs are forgotten @@ -239,16 +204,10 @@ public class JobManagerTest extends AbstractControllerTest { loginSysAdmin(); createDifferentTenant(); - TenantId tenantId = this.differentTenantId; - jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(1000) - .taskProcessingTimeMs(500) - .build()) + this.tenantId = this.differentTenantId; + submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(1000) + .taskProcessingTimeMs(500) .build()); Thread.sleep(2000); @@ -261,25 +220,18 @@ public class JobManagerTest extends AbstractControllerTest { } @Test - public void testSubmitMultipleJobs() { + public void testSubmitMultipleJobs() throws Exception { int tasksCount = 3; int jobsCount = 3; for (int i = 1; i <= jobsCount; i++) { - Job job = Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job-" + i) - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(1000) - .build()) - .build(); - jobManager.submitJob(job); + submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(1000) + .build(), "test-job-" + i); } await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { - List jobs = findJobs(JobType.DUMMY); + List jobs = findJobs(List.of(JobType.DUMMY), List.of(jobEntity.getUuidId())); assertThat(jobs).hasSize(jobsCount); Job firstJob = jobs.get(2); // ordered by createdTime descending assertThat(firstJob.getStatus()).isEqualTo(JobStatus.RUNNING); @@ -297,6 +249,11 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); } }); + + doDelete("/api/device/" + jobEntity.getId()).andExpect(status().isOk()); + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(findJobs(List.of(JobType.DUMMY), List.of(jobEntity.getUuidId()))).isEmpty(); + }); } @Test @@ -305,17 +262,11 @@ public class JobManagerTest extends AbstractControllerTest { int jobsCount = 3; List jobIds = new ArrayList<>(); for (int i = 1; i <= jobsCount; i++) { - Job job = Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job-" + i) - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(tasksCount) - .taskProcessingTimeMs(1000) - .build()) - .build(); - jobIds.add(jobManager.submitJob(job).getId()); + Job job = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(tasksCount) + .taskProcessingTimeMs(1000) + .build(), "test-job-" + i); + jobIds.add(job.getId()); } for (int i = 1; i < jobIds.size(); i++) { @@ -343,16 +294,10 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testSubmitJob_generalError() { int submittedTasks = 100; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("Test job") - .configuration(DummyJobConfiguration.builder() - .generalError("Some error while submitting tasks") - .submittedTasksBeforeGeneralError(submittedTasks) - .taskProcessingTimeMs(10) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .generalError("Some error while submitting tasks") + .submittedTasksBeforeGeneralError(submittedTasks) + .taskProcessingTimeMs(10) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -362,24 +307,13 @@ public class JobManagerTest extends AbstractControllerTest { assertThat(job.getResult().getDiscardedCount()).isZero(); assertThat(job.getResult().getTotalCount()).isNull(); }); - - checkJobNotification(notification -> { - assertThat(notification.getSubject()).isEqualTo("Dummy job task failed"); - assertThat(notification.getText()).isEqualTo("Test job failed: Some error while submitting tasks"); - }); } @Test public void testSubmitJob_immediateGeneralError() { - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("Test job") - .configuration(DummyJobConfiguration.builder() - .generalError("Some error while submitting tasks") - .submittedTasksBeforeGeneralError(0) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .generalError("Some error while submitting tasks") + .submittedTasksBeforeGeneralError(0) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -395,16 +329,10 @@ public class JobManagerTest extends AbstractControllerTest { @Test public void testReprocessJob_generalError() throws Exception { int submittedTasks = 100; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("Test job") - .configuration(DummyJobConfiguration.builder() - .generalError("Some error while submitting tasks") - .submittedTasksBeforeGeneralError(submittedTasks) - .taskProcessingTimeMs(10) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .generalError("Some error while submitting tasks") + .submittedTasksBeforeGeneralError(submittedTasks) + .taskProcessingTimeMs(10) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -437,17 +365,11 @@ public class JobManagerTest extends AbstractControllerTest { int successfulTasks = 3; int failedTasks = 2; int totalTasksCount = successfulTasks + failedTasks; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(successfulTasks) - .failedTasksCount(failedTasks) - .errors(List.of("error")) - .taskProcessingTimeMs(100) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(successfulTasks) + .failedTasksCount(failedTasks) + .errors(List.of("error")) + .taskProcessingTimeMs(100) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -484,18 +406,12 @@ public class JobManagerTest extends AbstractControllerTest { int failedTasks = 2; int permanentlyFailedTasks = 1; int totalTasksCount = successfulTasks + failedTasks + permanentlyFailedTasks; - JobId jobId = jobManager.submitJob(Job.builder() - .tenantId(tenantId) - .type(JobType.DUMMY) - .key("test-job") - .description("test job") - .configuration(DummyJobConfiguration.builder() - .successfulTasksCount(successfulTasks) - .failedTasksCount(failedTasks) - .permanentlyFailedTasksCount(permanentlyFailedTasks) - .errors(List.of("error")) - .taskProcessingTimeMs(100) - .build()) + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(successfulTasks) + .failedTasksCount(failedTasks) + .permanentlyFailedTasksCount(permanentlyFailedTasks) + .errors(List.of("error")) + .taskProcessingTimeMs(100) .build()).getId(); await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { @@ -534,14 +450,18 @@ public class JobManagerTest extends AbstractControllerTest { }); } - private void checkJobNotification(ThrowingConsumer assertFunction) { - await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { - Notification notification = getMyNotifications(true, 100).stream() - .findFirst().orElse(null); - assertThat(notification).isNotNull(); + private Job submitJob(DummyJobConfiguration configuration) { + return submitJob(configuration, "test-job"); + } - assertFunction.accept(notification); - }); + private Job submitJob(DummyJobConfiguration configuration, String key) { + return jobManager.submitJob(Job.builder() + .tenantId(tenantId) + .type(JobType.DUMMY) + .key(key) + .entityId(jobEntity.getId()) + .configuration(configuration) + .build()); } private List getFailures(JobResult jobResult) { diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java index 7d4c68f6a4..4f3f9548d8 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.job; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; @@ -42,4 +43,6 @@ public interface JobService extends EntityDaoService { void deleteJob(TenantId tenantId, JobId jobId); + int deleteJobsByEntityId(TenantId tenantId, EntityId entityId); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTask.java index ed02d22a74..2d88cef037 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTask.java @@ -85,6 +85,10 @@ public class HousekeeperTask implements Serializable { return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_CALCULATED_FIELDS); } + public static HousekeeperTask deleteJobs(TenantId tenantId, EntityId entityId) { + return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_JOBS); + } + @JsonIgnore public String getDescription() { return taskType.getDescription() + " for " + entityId.getEntityType().getNormalName().toLowerCase() + " " + entityId.getId(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTaskType.java b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTaskType.java index ef217debc3..e84642b599 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTaskType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/HousekeeperTaskType.java @@ -31,7 +31,8 @@ public enum HousekeeperTaskType { UNASSIGN_ALARMS("alarms unassigning"), DELETE_TENANT_ENTITIES("tenant entities deletion"), DELETE_ENTITIES("entities deletion"), - DELETE_CALCULATED_FIELDS("calculated fields deletion"); + DELETE_CALCULATED_FIELDS("calculated fields deletion"), + DELETE_JOBS("jobs deletion"); private final String description; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java index 3a9aabae76..031a733d51 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobResult.java @@ -17,14 +17,6 @@ package org.thingsboard.server.common.data.job; public class DummyJobResult extends JobResult { - @Override - public String getDescription() { - if (getGeneralError() != null) { - return getGeneralError(); - } - return getSuccessfulCount() + "/" + getTotalCount() + " successful, " + getFailedCount() + " failed"; - } - @Override public JobType getJobType() { return JobType.DUMMY; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java index f914067be3..3cfb55b388 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java @@ -24,10 +24,13 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.ToString; import org.thingsboard.server.common.data.BaseData; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; +import java.util.Set; import java.util.UUID; @Data @@ -42,8 +45,8 @@ public class Job extends BaseData implements HasTenantId { private JobType type; @NotBlank private String key; - @NotBlank - private String description; + @NotNull + private EntityId entityId; @NotNull private JobStatus status; @NotNull @@ -52,12 +55,16 @@ public class Job extends BaseData implements HasTenantId { @NotNull private JobResult result; + public static final Set SUPPORTED_ENTITY_TYPES = Set.of( + EntityType.DEVICE, EntityType.ASSET, EntityType.DEVICE_PROFILE, EntityType.ASSET_PROFILE + ); + @Builder(toBuilder = true) - public Job(TenantId tenantId, JobType type, String key, String description, JobConfiguration configuration) { + public Job(TenantId tenantId, JobType type, String key, EntityId entityId, JobConfiguration configuration) { this.tenantId = tenantId; this.type = type; this.key = key; - this.description = description; + this.entityId = entityId; this.configuration = configuration; this.configuration.setTasksKey(UUID.randomUUID().toString()); presetResult(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobFilter.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobFilter.java index 6cc9a636e8..1a678ba34b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobFilter.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobFilter.java @@ -19,6 +19,7 @@ import lombok.Builder; import lombok.Data; import java.util.List; +import java.util.UUID; @Data @Builder @@ -26,5 +27,8 @@ public class JobFilter { private final List types; private final List statuses; + private final List entities; + private final Long startTime; + private final Long endTime; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java index 285143dfa4..bd2c73d0d8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java @@ -66,9 +66,6 @@ public abstract class JobResult implements Serializable { } } - @JsonIgnore - public abstract String getDescription(); - @JsonIgnore public abstract JobType getJobType(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java index 1ca2973936..0340cd0fde 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/housekeeper/CleanUpService.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient; import org.thingsboard.server.dao.eventsourcing.ActionCause; import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; @@ -76,6 +77,9 @@ public class CleanUpService { submitTask(HousekeeperTask.deleteEvents(tenantId, entityId)); submitTask(HousekeeperTask.deleteAlarms(tenantId, entityId)); submitTask(HousekeeperTask.deleteCalculatedFields(tenantId, entityId)); + if (Job.SUPPORTED_ENTITY_TYPES.contains(entityId.getEntityType())) { + submitTask(HousekeeperTask.deleteJobs(tenantId, entityId)); + } } public void removeTenantEntities(TenantId tenantId, EntityType... entityTypes) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index cd001b61b4..5afac21e5a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -157,6 +157,10 @@ public class DefaultJobService extends AbstractEntityService implements JobServi private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) { ConstraintValidator.validateFields(job); + if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) { + throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType()); + } + job = jobDao.save(tenantId, job); if (publishEvent) { eventPublisher.publishEvent(SaveEntityEvent.builder() @@ -203,6 +207,11 @@ public class DefaultJobService extends AbstractEntityService implements JobServi jobDao.removeById(tenantId, jobId.getId()); } + @Override + public int deleteJobsByEntityId(TenantId tenantId, EntityId entityId) { // TODO: cancel all jobs for this entity + return jobDao.removeByEntityId(tenantId, entityId); + } + private Job findForUpdate(TenantId tenantId, JobId jobId) { return jobDao.findByIdForUpdate(tenantId, jobId); } @@ -219,7 +228,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi @Override public void deleteByTenantId(TenantId tenantId) { - jobDao.deleteByTenantId(tenantId); + jobDao.removeByTenantId(tenantId); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java b/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java index 46cc70aea8..498fc9f2de 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.job; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; @@ -37,8 +38,12 @@ public interface JobDao extends Dao { boolean existsByTenantIdAndTypeAndStatusOneOf(TenantId tenantId, JobType type, JobStatus... statuses); + boolean existsByTenantIdAndEntityIdAndStatusOneOf(TenantId tenantId, EntityId entityId, JobStatus... statuses); + Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status); - void deleteByTenantId(TenantId tenantId); + void removeByTenantId(TenantId tenantId); + + int removeByEntityId(TenantId tenantId, EntityId entityId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 707dcdc3a5..23324eccb5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -745,7 +745,8 @@ public class ModelConstants { public static final String JOB_TABLE_NAME = "job"; public static final String JOB_TYPE_PROPERTY = "type"; public static final String JOB_KEY_PROPERTY = "key"; - public static final String JOB_DESCRIPTION_PROPERTY = "description"; + public static final String JOB_ENTITY_ID_PROPERTY = "entity_id"; + public static final String JOB_ENTITY_TYPE_PROPERTY = "entity_type"; public static final String JOB_STATUS_PROPERTY = "status"; public static final String JOB_CONFIGURATION_PROPERTY = "configuration"; public static final String JOB_RESULT_PROPERTY = "result"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java index 498541962b..16d7d33edc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java @@ -25,6 +25,8 @@ import jakarta.persistence.Table; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobConfiguration; @@ -54,8 +56,12 @@ public class JobEntity extends BaseSqlEntity { @Column(name = ModelConstants.JOB_KEY_PROPERTY, nullable = false) private String key; - @Column(name = ModelConstants.JOB_DESCRIPTION_PROPERTY, nullable = false) - private String description; + @Column(name = ModelConstants.JOB_ENTITY_ID_PROPERTY, nullable = false) + private UUID entityId; + + @Enumerated(EnumType.STRING) + @Column(name = ModelConstants.JOB_ENTITY_TYPE_PROPERTY, nullable = false) + private EntityType entityType; @Enumerated(EnumType.STRING) @Column(name = ModelConstants.JOB_STATUS_PROPERTY, nullable = false) @@ -74,7 +80,8 @@ public class JobEntity extends BaseSqlEntity { this.tenantId = getTenantUuid(job.getTenantId()); this.type = job.getType(); this.key = job.getKey(); - this.description = job.getDescription(); + this.entityId = job.getEntityId().getId(); + this.entityType = job.getEntityId().getEntityType(); this.status = job.getStatus(); this.configuration = toJson(job.getConfiguration()); this.result = toJson(job.getResult()); @@ -88,7 +95,7 @@ public class JobEntity extends BaseSqlEntity { job.setTenantId(getTenantId(tenantId)); job.setType(type); job.setKey(key); - job.setDescription(description); + job.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId)); job.setStatus(status); job.setConfiguration(fromJson(configuration, JobConfiguration.class)); job.setResult(fromJson(result, JobResult.class)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java index 72d569c94b..df391be1b1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.sql.job; +import org.springframework.data.domain.Limit; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; @@ -34,26 +35,34 @@ import java.util.UUID; public interface JobRepository extends JpaRepository { @Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId " + - "AND (:types IS NULL OR j.type IN (:types)) AND (:statuses IS NULL OR j.status IN (:statuses)) " + - "AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true " + - "OR ilike(j.description, concat('%', :searchText, '%')) = true)") - Page findByTenantIdAndTypesAndStatusesAndSearchText(@Param("tenantId") UUID tenantId, - @Param("types") List types, - @Param("statuses") List statuses, - @Param("searchText") String searchText, - Pageable pageable); + "AND (:types IS NULL OR j.type IN (:types)) " + + "AND (:statuses IS NULL OR j.status IN (:statuses)) " + + "AND (:entities IS NULL OR j.entityId IN :entities) " + + "AND (:startTime <= 0 OR j.createdTime >= :startTime) " + + "AND (:endTime <= 0 OR j.createdTime <= :endTime) " + + "AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true)") + Page findByTenantIdAndTypesAndStatusesAndEntitiesAndTimeAndSearchText(@Param("tenantId") UUID tenantId, + @Param("types") List types, + @Param("statuses") List statuses, + @Param("entities") List entities, + @Param("startTime") long startTime, + @Param("endTime") long endTime, + @Param("searchText") String searchText, + Pageable pageable); @Query(value = "SELECT * FROM job j WHERE j.id = :id FOR UPDATE", nativeQuery = true) JobEntity findByIdForUpdate(UUID id); @Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId AND j.key = :key " + "ORDER BY j.createdTime DESC") - JobEntity findLatestByTenantIdAndKey(@Param("tenantId") UUID tenantId, @Param("key") String key); + JobEntity findLatestByTenantIdAndKey(@Param("tenantId") UUID tenantId, @Param("key") String key, Limit limit); boolean existsByTenantIdAndKeyAndStatusIn(UUID tenantId, String key, List statuses); boolean existsByTenantIdAndTypeAndStatusIn(UUID tenantId, JobType type, List statuses); + boolean existsByTenantIdAndEntityIdAndStatusIn(UUID tenantId, UUID entityId, List statuses); + @Query(value = "SELECT * FROM job j WHERE j.tenant_id = :tenantId AND j.type = :type " + "AND j.status = :status ORDER BY j.created_time ASC, j.id ASC LIMIT 1 FOR UPDATE", nativeQuery = true) JobEntity findOldestByTenantIdAndTypeAndStatusForUpdate(UUID tenantId, String type, String status); @@ -63,4 +72,9 @@ public interface JobRepository extends JpaRepository { @Query("DELETE FROM JobEntity j WHERE j.tenantId = :tenantId") void deleteByTenantId(UUID tenantId); + @Transactional + @Modifying + @Query("DELETE FROM JobEntity j WHERE j.entityId = :entityId") + int deleteByEntityId(UUID entityId); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java index 5c9fe230e8..40d5177ad6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java @@ -17,9 +17,11 @@ package org.thingsboard.server.dao.sql.job; import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.Limit; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.Job; @@ -47,9 +49,12 @@ public class JpaJobDao extends JpaAbstractDao implements JobDao @Override public PageData findByTenantIdAndFilter(TenantId tenantId, JobFilter filter, PageLink pageLink) { - return DaoUtil.toPageData(jobRepository.findByTenantIdAndTypesAndStatusesAndSearchText(tenantId.getId(), + return DaoUtil.toPageData(jobRepository.findByTenantIdAndTypesAndStatusesAndEntitiesAndTimeAndSearchText(tenantId.getId(), CollectionsUtil.isEmpty(filter.getTypes()) ? null : filter.getTypes(), CollectionsUtil.isEmpty(filter.getStatuses()) ? null : filter.getStatuses(), + CollectionsUtil.isEmpty(filter.getEntities()) ? null : filter.getEntities(), + filter.getStartTime() != null ? filter.getStartTime() : 0, + filter.getEndTime() != null ? filter.getEndTime() : 0, Strings.emptyToNull(pageLink.getTextSearch()), DaoUtil.toPageable(pageLink))); } @@ -60,7 +65,7 @@ public class JpaJobDao extends JpaAbstractDao implements JobDao @Override public Job findLatestByTenantIdAndKey(TenantId tenantId, String key) { - return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key)); + return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key, Limit.of(1))); } @Override @@ -73,16 +78,26 @@ public class JpaJobDao extends JpaAbstractDao implements JobDao return jobRepository.existsByTenantIdAndTypeAndStatusIn(tenantId.getId(), type, Arrays.stream(statuses).toList()); } + @Override + public boolean existsByTenantIdAndEntityIdAndStatusOneOf(TenantId tenantId, EntityId entityId, JobStatus... statuses) { + return jobRepository.existsByTenantIdAndEntityIdAndStatusIn(tenantId.getId(), entityId.getId(), Arrays.stream(statuses).toList()); + } + @Override public Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status) { return DaoUtil.getData(jobRepository.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId.getId(), type.name(), status.name())); } @Override - public void deleteByTenantId(TenantId tenantId) { + public void removeByTenantId(TenantId tenantId) { jobRepository.deleteByTenantId(tenantId.getId()); } + @Override + public int removeByEntityId(TenantId tenantId, EntityId entityId) { + return jobRepository.deleteByEntityId(entityId.getId()); + } + @Override public EntityType getEntityType() { return EntityType.JOB; diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 62ce86a76c..f2a0bc26c1 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -955,7 +955,8 @@ CREATE TABLE IF NOT EXISTS job ( tenant_id uuid NOT NULL, type varchar NOT NULL, key varchar NOT NULL, - description varchar NOT NULL, + entity_id uuid NOT NULL, + entity_type varchar NOT NULL, status varchar NOT NULL, configuration varchar NOT NULL, result varchar diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java index 3ee29dd7c0..89434b20cf 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/JobManager.java @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.job.Job; public interface JobManager { - Job submitJob(Job job); + Job submitJob(Job job); // TODO: rate limits void cancelJob(TenantId tenantId, JobId jobId);