Browse Source

Multiple queued jobs of the same type

pull/13286/head
ViacheslavKlimov 1 year ago
parent
commit
60fdb7df3d
  1. 2
      application/src/main/java/org/thingsboard/server/controller/JobController.java
  2. 7
      application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java
  3. 7
      application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java
  4. 48
      application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java
  5. 11
      application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java
  6. 2
      application/src/main/java/org/thingsboard/server/service/job/JobManager.java
  7. 8
      application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java
  8. 119
      application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java
  9. 2
      application/src/test/resources/logback-test.xml
  10. 4
      common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java
  11. 2
      common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java
  12. 3
      common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java
  13. 2
      common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java
  14. 3
      common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java
  15. 5
      common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java
  16. 16
      common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java
  17. 2
      common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java
  18. 2
      common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java
  19. 16
      common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java
  20. 119
      dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java
  21. 4
      dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java
  22. 49
      dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java
  23. 10
      dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java

2
application/src/main/java/org/thingsboard/server/controller/JobController.java

@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.task.JobService;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.job.JobManager;

7
application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java

@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
@ -59,6 +60,7 @@ import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.service.job.JobManager;
import java.util.Set;
@ -70,6 +72,7 @@ public class EntityStateSourcingListener {
private final TenantService tenantService;
private final TbClusterService tbClusterService;
private final EdgeSynchronizationManager edgeSynchronizationManager;
private final JobManager jobManager;
@PostConstruct
public void init() {
@ -300,7 +303,9 @@ public class EntityStateSourcingListener {
}
private void onJobUpdate(Job job) {
if (job.getResult().getCancellationTs() > 0) {
jobManager.onJobUpdate(job);
if (job.getResult().getCancellationTs() > 0 || job.getStatus().isOneOf(JobStatus.FAILED)) {
// task processors will add this job to the list of discarded
tbClusterService.broadcastEntityStateChangeEvent(job.getTenantId(), job.getId(), ComponentLifecycleEvent.STOPPED);
}
}

7
application/src/main/java/org/thingsboard/server/service/job/CfReprocessingJobProcessor.java

@ -17,7 +17,6 @@ package org.thingsboard.server.service.job;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.AssetProfileId;
@ -36,15 +35,13 @@ import java.util.function.Consumer;
@Component
@RequiredArgsConstructor
public class CfReprocessingJobProcessor extends JobProcessor {
public class CfReprocessingJobProcessor implements JobProcessor {
private final DeviceService deviceService;
private final AssetService assetService;
// fixme: multiple jobs with single type
@Transactional
@Override
public int process(Job job, Consumer<Task> taskConsumer) {
public int process(Job job, Consumer<Task> taskConsumer) throws Exception {
CfReprocessingJobConfiguration configuration = job.getConfiguration();
CalculatedField calculatedField = configuration.getCalculatedField();

48
application/src/main/java/org/thingsboard/server/service/job/DefaultJobManager.java

@ -18,19 +18,22 @@ package org.thingsboard.server.service.job;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task;
import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.task.JobService;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueCallback;
@ -64,6 +67,7 @@ public class DefaultJobManager implements JobManager {
private final Map<JobType, JobProcessor> jobProcessors;
private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
private final ExecutorService executor;
private final ExecutorService consumerExecutor;
@Value("${queue.tasks.stats.processing_interval_ms:5000}")
@ -74,6 +78,7 @@ public class DefaultJobManager implements JobManager {
this.jobStatsService = jobStatsService;
this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer));
this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass());
this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer"));
this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder()
.name("job-stats")
@ -92,22 +97,40 @@ public class DefaultJobManager implements JobManager {
@Override
public Job submitJob(Job job) {
job = jobService.createJob(job.getTenantId(), job);
log.info("Submitting job: {}", job);
log.debug("Submitting job: {}", job);
return jobService.createJob(job.getTenantId(), job);
}
int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask);
jobStatsService.reportAllTasksSubmitted(job.getTenantId(), job.getId(), tasksCount);
return job;
@Override
public void onJobUpdate(Job job) {
if (job.getStatus() == JobStatus.PENDING) {
executor.execute(() -> {
TenantId tenantId = job.getTenantId();
JobId jobId = job.getId();
try {
int tasksCount = jobProcessors.get(job.getType()).process(job, this::submitTask); // todo: think about stopping tb - while tasks are being submitted
log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount);
jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount);
} catch (Throwable e) {
log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e);
try {
jobService.markAsFailed(tenantId, jobId, ExceptionUtils.getStackTrace(e));
} catch (Throwable e2) {
log.error("[{}][{}] Failed to mark job as failed", tenantId, jobId, e2);
}
}
});
}
}
@Override
public void cancelJob(TenantId tenantId, JobId jobId) {
log.info("Cancelling job: {}", jobId);
log.info("[{}][{}] Cancelling job", tenantId, jobId);
jobService.cancelJob(tenantId, jobId);
}
private void submitTask(Task task) {
log.info("Submitting task: {}", task);
log.info("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task);
TaskProto taskProto = TaskProto.newBuilder()
.setValue(JacksonUtil.toString(task))
.build();
@ -147,22 +170,23 @@ public class DefaultJobManager implements JobManager {
}
stats.forEach((jobId, jobStats) -> {
TenantId tenantId = jobStats.getTenantId();
try {
TenantId tenantId = jobStats.getTenantId();
log.info("[{}][{}] Processing job stats: {}", tenantId, jobId, stats);
log.debug("[{}][{}] Processing job stats: {}", tenantId, jobId, stats);
jobService.processStats(tenantId, jobId, jobStats);
} catch (Exception e) {
log.warn("Failed to process job stats for {}: {}", jobId, jobStats, e);
log.error("[{}][{}] Failed to process job stats: {}", tenantId, jobId, jobStats, e);
}
});
consumer.commit();
Thread.sleep(statsProcessingInterval);
Thread.sleep(statsProcessingInterval); // todo: test with bigger interval
}
@PreDestroy
private void destroy() {
jobStatsConsumer.stop();
executor.shutdownNow();
consumerExecutor.shutdownNow();
}

11
application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java

@ -28,11 +28,18 @@ import java.util.function.Consumer;
@Component
@RequiredArgsConstructor
public class DummyJobProcessor extends JobProcessor {
public class DummyJobProcessor implements JobProcessor {
@Override
public int process(Job job, Consumer<Task> taskConsumer) {
public int process(Job job, Consumer<Task> taskConsumer) throws Exception {
DummyJobConfiguration configuration = job.getConfiguration();
if (configuration.getGeneralError() != null) {
for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) {
taskConsumer.accept(createTask(job, configuration, number, null));
}
Thread.sleep(configuration.getTaskProcessingTimeMs() * (configuration.getSubmittedTasksBeforeGeneralError() / 2)); // sleeping so that some tasks are processed
throw new RuntimeException(configuration.getGeneralError());
}
for (int number = 1; number <= configuration.getSuccessfulTasksCount(); number++) {
taskConsumer.accept(createTask(job, configuration, number, null));
}

2
application/src/main/java/org/thingsboard/server/service/job/JobManager.java

@ -25,4 +25,6 @@ public interface JobManager {
void cancelJob(TenantId tenantId, JobId jobId);
void onJobUpdate(Job job);
}

8
application/src/main/java/org/thingsboard/server/service/job/JobProcessor.java

@ -15,16 +15,16 @@
*/
package org.thingsboard.server.service.job;
import org.thingsboard.server.common.data.job.Task;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.Task;
import java.util.function.Consumer;
public abstract class JobProcessor {
public interface JobProcessor {
public abstract int process(Job job, Consumer<Task> taskConsumer);
int process(Job job, Consumer<Task> taskConsumer) throws Exception;
public abstract JobType getType();
JobType getType();
}

119
application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java

@ -33,12 +33,14 @@ import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.task.JobService;
import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.service.job.task.DummyTaskProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -157,7 +159,7 @@ public class JobManagerTest extends AbstractControllerTest {
Job job = findJobById(jobId);
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED);
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getDiscardedCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
});
@ -184,15 +186,14 @@ public class JobManagerTest extends AbstractControllerTest {
inv.callRealMethod();
}
return null;
}).when(taskProcessor).addToCancelledJobs(any()); // ignoring cancellation event,
}).when(taskProcessor).addToDiscardedJobs(any()); // ignoring cancellation event,
jobManager.cancelJob(tenantId, jobId);
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
Job job = findJobById(jobId);
System.err.println(job);
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED);
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getCancelledCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getDiscardedCount()).isBetween(1, tasksCount - 1);
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount);
});
@ -224,12 +225,118 @@ public class JobManagerTest extends AbstractControllerTest {
Assertions.assertThat(jobService.findJobsByTenantId(tenantId, new PageLink(100, 0)).getData()).isEmpty();
}
@Test
public void testSubmitMultipleJobs() {
int tasksCount = 3;
int jobsCount = 3;
for (int i = 1; i <= jobsCount; i++) {
Job job = Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job-" + i)
.description("test job")
.configuration(DummyJobConfiguration.builder()
.successfulTasksCount(tasksCount)
.taskProcessingTimeMs(1000)
.build())
.build();
jobManager.submitJob(job);
}
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
List<Job> jobs = findJobs();
assertThat(jobs).hasSize(jobsCount);
Job firstJob = jobs.get(2); // ordered by createdTime descending
assertThat(firstJob.getStatus()).isEqualTo(JobStatus.RUNNING);
Job secondJob = jobs.get(1);
assertThat(secondJob.getStatus()).isEqualTo(JobStatus.QUEUED);
Job thirdJob = jobs.get(0);
assertThat(thirdJob.getStatus()).isEqualTo(JobStatus.QUEUED);
});
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
List<Job> jobs = findJobs();
for (Job job : jobs) {
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED);
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount);
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount);
}
});
}
@Test
public void testCancelQueuedJob() {
int tasksCount = 3;
int jobsCount = 3;
List<JobId> jobIds = new ArrayList<>();
for (int i = 1; i <= jobsCount; i++) {
Job job = Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job-" + i)
.description("test job")
.configuration(DummyJobConfiguration.builder()
.successfulTasksCount(tasksCount)
.taskProcessingTimeMs(1000)
.build())
.build();
jobIds.add(jobManager.submitJob(job).getId());
}
for (int i = 1; i < jobIds.size(); i++) {
jobManager.cancelJob(tenantId, jobIds.get(i));
}
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
List<Job> jobs = findJobs();
Job firstJob = jobs.get(2);
assertThat(firstJob.getStatus()).isEqualTo(JobStatus.COMPLETED);
assertThat(firstJob.getResult().getSuccessfulCount()).isEqualTo(tasksCount);
assertThat(firstJob.getResult().getTotalCount()).isEqualTo(tasksCount);
Job secondJob = jobs.get(1);
assertThat(secondJob.getStatus()).isEqualTo(JobStatus.CANCELLED);
assertThat(secondJob.getResult().getCompletedCount()).isZero();
Job thirdJob = jobs.get(0);
assertThat(thirdJob.getStatus()).isEqualTo(JobStatus.CANCELLED);
assertThat(thirdJob.getResult().getCompletedCount()).isZero();
});
}
@Test
public void testGeneralJobError() {
int submittedTasks = 100;
JobId jobId = jobManager.submitJob(Job.builder()
.tenantId(tenantId)
.type(JobType.DUMMY)
.key("test-job")
.description("test job")
.configuration(DummyJobConfiguration.builder()
.generalError("Some error while submitting tasks")
.submittedTasksBeforeGeneralError(submittedTasks)
.taskProcessingTimeMs(10)
.build())
.build()).getId();
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
Job job = findJobById(jobId);
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED);
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, submittedTasks);
assertThat(job.getResult().getDiscardedCount()).isBetween(1, submittedTasks);
assertThat(job.getResult().getTotalCount()).isNull();
});
}
// todo: job with zero tasks, reprocessing
private Job findJobById(JobId jobId) throws Exception {
return doGet("/api/job/" + jobId, Job.class);
}
private List<Job> findJobs() throws Exception {
return doGetTypedWithPageLink("/api/jobs?", new TypeReference<PageData<Job>>() {}, new PageLink(100, 0)).getData();
return doGetTypedWithPageLink("/api/jobs?", new TypeReference<PageData<Job>>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData();
}
}

2
application/src/test/resources/logback-test.xml

@ -9,7 +9,7 @@
<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
<logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.server" level="WARN"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.springframework.boot.test" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>

4
common/dao-api/src/main/java/org/thingsboard/server/dao/task/JobService.java → common/dao-api/src/main/java/org/thingsboard/server/dao/job/JobService.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.task;
package org.thingsboard.server.dao.job;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
@ -31,6 +31,8 @@ public interface JobService extends EntityDaoService {
void cancelJob(TenantId tenantId, JobId jobId);
void markAsFailed(TenantId tenantId, JobId jobId, String error);
void processStats(TenantId tenantId, JobId jobId, JobStats jobStats);
PageData<Job> findJobsByTenantId(TenantId tenantId, PageLink pageLink);

2
common/data/src/main/java/org/thingsboard/server/common/data/job/CfReprocessingTask.java

@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.EntityId;
@ -26,6 +27,7 @@ import org.thingsboard.server.common.data.id.EntityId;
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@ToString(callSuper = true)
public class CfReprocessingTask extends Task {
private CalculatedField calculatedField;

3
common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java

@ -34,6 +34,9 @@ public class DummyJobConfiguration implements JobConfiguration {
private List<String> errors;
private int retries;
private String generalError;
private int submittedTasksBeforeGeneralError;
@Override
public JobType getType() {
return JobType.DUMMY;

2
common/data/src/main/java/org/thingsboard/server/common/data/job/DummyTask.java

@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.job;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.util.List;
@ -26,6 +27,7 @@ import java.util.List;
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
@ToString(callSuper = true)
public class DummyTask extends Task {
private int number;

3
common/data/src/main/java/org/thingsboard/server/common/data/job/Job.java

@ -21,6 +21,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.HasTenantId;
import org.thingsboard.server.common.data.id.JobId;
@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.id.TenantId;
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class Job extends BaseData<JobId> implements HasTenantId {
@ -51,7 +53,6 @@ public class Job extends BaseData<JobId> implements HasTenantId {
this.key = key;
this.description = description;
this.configuration = configuration;
this.status = JobStatus.PENDING;
this.result = switch (type) {
case CF_REPROCESSING -> new CfReprocessingJobResult();
case DUMMY -> new DummyJobResult();

5
common/data/src/main/java/org/thingsboard/server/common/data/job/JobResult.java

@ -39,15 +39,16 @@ public abstract class JobResult implements Serializable {
private int successfulCount;
private int failedCount;
private int cancelledCount;
private int discardedCount;
private Integer totalCount = null; // set when all tasks are submitted
private Map<String, String> failures = new HashMap<>();
private String generalError;
private long cancellationTs;
@JsonIgnore
public int getCompletedCount() {
return successfulCount + failedCount + cancelledCount;
return successfulCount + failedCount + discardedCount;
}
public abstract JobType getJobType();

16
common/data/src/main/java/org/thingsboard/server/common/data/job/JobStatus.java

@ -16,9 +16,23 @@
package org.thingsboard.server.common.data.job;
public enum JobStatus {
QUEUED,
PENDING,
RUNNING,
COMPLETED,
FAILED,
CANCELLED
CANCELLED;
public boolean isOneOf(JobStatus... statuses) {
if (statuses == null) {
return false;
}
for (JobStatus status : statuses) {
if (this == status) {
return true;
}
}
return false;
}
}

2
common/data/src/main/java/org/thingsboard/server/common/data/job/TaskResult.java

@ -27,7 +27,7 @@ import lombok.NoArgsConstructor;
public class TaskResult {
private boolean success;
private boolean cancelled;
private boolean discarded;
private TaskFailure failure;
@Data

2
common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java

@ -52,7 +52,7 @@ public class JobStatsService {
}
private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) {
log.info("[{}] Reporting: {}", jobId, statsMsg);
log.debug("[{}] Reporting: {}", jobId, statsMsg);
statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setJobIdMSB(jobId.getId().getMostSignificantBits())

16
common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java

@ -57,7 +57,7 @@ public abstract class TaskProcessor<T extends Task> {
private ExecutorService consumerExecutor;
private final Set<UUID> deletedTenants = ConcurrentHashMap.newKeySet();
private final Set<UUID> cancelledJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine
private final Set<UUID> discardedJobs = ConcurrentHashMap.newKeySet(); // fixme use caffeine
@PostConstruct
public void init() {
@ -83,14 +83,14 @@ public abstract class TaskProcessor<T extends Task> {
switch (entityId.getEntityType()) {
case JOB -> {
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) {
log.info("Adding job {} to cancelledJobs", entityId);
addToCancelledJobs(entityId.getId());
log.debug("Adding job {} to discarded", entityId);
addToDiscardedJobs(entityId.getId());
}
}
case TENANT -> {
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
deletedTenants.add(entityId.getId());
log.info("Adding tenant {} to deletedTenants", entityId);
log.debug("Adding tenant {} to deleted", entityId);
}
}
}
@ -100,7 +100,7 @@ public abstract class TaskProcessor<T extends Task> {
for (TbProtoQueueMsg<TaskProto> msg : msgs) {
try {
Task task = JacksonUtil.fromString(msg.getValue().getValue(), Task.class);
if (cancelledJobs.contains(task.getJobId().getId())) {
if (discardedJobs.contains(task.getJobId().getId())) {
log.info("Skipping task '{}' for cancelled job {}", task.getKey(), task.getJobId());
reportCancelled(task);
continue;
@ -157,13 +157,13 @@ public abstract class TaskProcessor<T extends Task> {
private void reportCancelled(Task task) {
TaskResult result = TaskResult.builder()
.cancelled(true)
.discarded(true)
.build();
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
}
public void addToCancelledJobs(UUID jobId) {
cancelledJobs.add(jobId);
public void addToDiscardedJobs(UUID jobId) {
discardedJobs.add(jobId);
}
@PreDestroy

119
dao/src/main/java/org/thingsboard/server/dao/task/DefaultJobService.java → dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.task;
package org.thingsboard.server.dao.job;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -28,17 +28,24 @@ import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobResult;
import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.TaskResult;
import org.thingsboard.server.common.data.job.TaskResult.TaskFailure;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
import java.util.Optional;
import static org.thingsboard.server.common.data.job.JobStatus.CANCELLED;
import static org.thingsboard.server.common.data.job.JobStatus.COMPLETED;
import static org.thingsboard.server.common.data.job.JobStatus.FAILED;
import static org.thingsboard.server.common.data.job.JobStatus.PENDING;
import static org.thingsboard.server.common.data.job.JobStatus.QUEUED;
import static org.thingsboard.server.common.data.job.JobStatus.RUNNING;
@Service
@RequiredArgsConstructor
@Slf4j
@ -47,10 +54,16 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
private final JobDao jobDao;
private final JobValidator validator = new JobValidator();
@Transactional
@Override
public Job createJob(TenantId tenantId, Job job) {
validator.validate(job, Job::getTenantId);
return saveJob(tenantId, job, false);
if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) {
job.setStatus(QUEUED);
} else {
job.setStatus(PENDING);
}
return saveJob(tenantId, job, true, null);
}
@Override
@ -62,11 +75,27 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
@Override
public void cancelJob(TenantId tenantId, JobId jobId) {
Job job = findForUpdate(tenantId, jobId);
if (job.getStatus() != JobStatus.PENDING && job.getStatus() != JobStatus.RUNNING) {
if (!job.getStatus().isOneOf(QUEUED, PENDING, RUNNING)) {
throw new IllegalArgumentException("Job already " + job.getStatus().name().toLowerCase());
}
job.getResult().setCancellationTs(System.currentTimeMillis());
saveJob(tenantId, job, true);
JobStatus prevStatus = job.getStatus();
if (job.getStatus() == QUEUED) {
job.setStatus(CANCELLED); // setting cancelled status right away, because we don't expect stats for cancelled tasks
} else if (job.getStatus() == PENDING) {
job.setStatus(RUNNING);
}
saveJob(tenantId, job, true, prevStatus);
}
@Transactional
@Override
public void markAsFailed(TenantId tenantId, JobId jobId, String error) {
Job job = findForUpdate(tenantId, jobId);
job.getResult().setGeneralError(error);
JobStatus prevStatus = job.getStatus();
job.setStatus(FAILED);
saveJob(tenantId, job, true, prevStatus);
}
@Transactional
@ -74,39 +103,36 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
public void processStats(TenantId tenantId, JobId jobId, JobStats jobStats) {
Job job = findForUpdate(tenantId, jobId);
if (job == null) {
log.info("Got stale stats for job {}: {}", jobId, jobStats);
log.debug("[{}][{}] Got stale stats: {}", tenantId, jobId, jobStats);
return;
}
switch (job.getStatus()) {
case PENDING -> {
job.setStatus(JobStatus.RUNNING);
}
case CANCELLED, COMPLETED, FAILED -> {
// got some stale stats
return;
}
JobStatus prevStatus = job.getStatus();
if (job.getStatus() == PENDING) {
job.setStatus(RUNNING);
}
JobResult jobResult = job.getResult();
JobResult result = job.getResult();
if (jobStats.getTotalTasksCount() != null) {
jobResult.setTotalCount(jobStats.getTotalTasksCount());
result.setTotalCount(jobStats.getTotalTasksCount());
}
boolean publishEvent = false;
for (TaskResult taskResult : jobStats.getTaskResults()) {
if (taskResult.isSuccess()) {
jobResult.setSuccessfulCount(jobResult.getSuccessfulCount() + 1);
} else if (taskResult.isCancelled()) {
jobResult.setCancelledCount(jobResult.getCancelledCount() + 1);
result.setSuccessfulCount(result.getSuccessfulCount() + 1);
} else if (taskResult.isDiscarded()) {
result.setDiscardedCount(result.getDiscardedCount() + 1);
} else {
TaskFailure failure = taskResult.getFailure();
String key = failure.getTask().getKey();
jobResult.setFailedCount(jobResult.getFailedCount() + 1);
jobResult.getFailures().put(key, failure.getError());
result.setFailedCount(result.getFailedCount() + 1);
if (result.getFailures().size() < 1000) { // preserving only first 1000 errors, not reprocessing if there are more failures
result.getFailures().put(key, failure.getError());
}
}
if (jobResult.getCancellationTs() > 0) {
if (!taskResult.isCancelled() && System.currentTimeMillis() > jobResult.getCancellationTs()) {
if (result.getCancellationTs() > 0) {
if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) {
log.info("Got task result for cancelled job {}: {}, re-notifying processors about cancellation", jobId, taskResult);
// task processor forgot the task is cancelled
publishEvent = true;
@ -114,32 +140,49 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
}
}
if (jobResult.getTotalCount() != null && jobResult.getCompletedCount() >= jobResult.getTotalCount()) {
if (jobResult.getCancellationTs() > 0) {
job.setStatus(JobStatus.CANCELLED);
} else if (jobResult.getFailedCount() > 0) {
job.setStatus(JobStatus.FAILED);
} else {
job.setStatus(JobStatus.COMPLETED);
if (job.getStatus() == RUNNING) {
if (result.getTotalCount() != null && result.getCompletedCount() >= result.getTotalCount()) {
if (result.getCancellationTs() > 0) {
job.setStatus(CANCELLED);
} else if (result.getFailedCount() > 0) {
job.setStatus(FAILED);
} else {
job.setStatus(COMPLETED);
}
}
}
log.info("Saving job {}", job);
saveJob(tenantId, job, publishEvent);
saveJob(tenantId, job, publishEvent, prevStatus);
}
private Job saveJob(TenantId tenantId, Job job, boolean publishEvent) {
private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) {
job = jobDao.save(tenantId, job);
if (publishEvent) {
eventPublisher.publishEvent(SaveEntityEvent.builder()
.tenantId(tenantId)
.entityId(job.getId())
.entity(job)
.created(false)
.build());
}
log.info("[{}] Saved job: {}", tenantId, job);
if (prevStatus != null && job.getStatus() != prevStatus) {
log.info("[{}][{}][{}] New job status: {} -> {}", tenantId, job.getId(), job.getType(), prevStatus, job.getStatus());
if (job.getStatus().isOneOf(CANCELLED, COMPLETED, FAILED) && prevStatus != QUEUED) { // if prev status is QUEUED - means there are already running jobs with this type, no need to check for waiting job
checkWaitingJobs(tenantId, job.getType());
}
}
return job;
}
private void checkWaitingJobs(TenantId tenantId, JobType jobType) {
Job queuedJob = jobDao.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId, jobType, QUEUED);
if (queuedJob == null) {
return;
}
queuedJob.setStatus(PENDING);
saveJob(tenantId, queuedJob, true, QUEUED);
}
@Override
public PageData<Job> findJobsByTenantId(TenantId tenantId, PageLink pageLink) {
return jobDao.findByTenantId(tenantId, pageLink);
@ -149,15 +192,15 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
return jobDao.findByIdForUpdate(tenantId, jobId);
}
// todo: cancellation, reprocessing
// todo: reprocessing
public class JobValidator extends DataValidator<Job> {
@Override
protected void validateCreate(TenantId tenantId, Job job) {
if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), JobStatus.PENDING, JobStatus.RUNNING)) {
throw new DataValidationException("Job of this type is already running");
}
// if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) {
// throw new DataValidationException("Job of this type is already running");
// }
}
@Override

4
dao/src/main/java/org/thingsboard/server/dao/task/JobDao.java → dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.task;
package org.thingsboard.server.dao.job;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
@ -34,4 +34,6 @@ public interface JobDao extends Dao<Job> {
boolean existsByTenantIdAndTypeAndStatusOneOf(TenantId tenantId, JobType type, JobStatus... statuses);
Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status);
}

49
dao/src/main/java/org/thingsboard/server/dao/sql/task/JobRepository.java → dao/src/main/java/org/thingsboard/server/dao/sql/job/JobRepository.java

@ -13,15 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.task;
package org.thingsboard.server.dao.sql.job;
import jakarta.persistence.LockModeType;
import jakarta.transaction.Transactional;
import org.springframework.data.domain.Limit;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
@ -36,7 +35,7 @@ import java.util.UUID;
public interface JobRepository extends JpaRepository<JobEntity, UUID> {
@Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId " +
"AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true " +
"AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true " +
"OR ilike(j.description, concat('%', :searchText, '%')) = true)")
Page<JobEntity> findByTenantIdAndSearchText(@Param("tenantId") UUID tenantId,
@Param("searchText") String searchText,
@ -46,45 +45,13 @@ public interface JobRepository extends JpaRepository<JobEntity, UUID> {
@Query("SELECT j FROM JobEntity j WHERE j.id = :id")
JobEntity findByIdForUpdate(UUID id);
@Modifying
@Transactional
@Query(value = """
UPDATE job
SET result = jsonb_set(
result,
'{successfulCount}',
to_jsonb((result->>'successfulCount')::int + :count)
)
WHERE id = :jobId
RETURNING ((result->>'successfulCount')::int + :count)
+ (result->>'failedCount')::int = (result->>'totalCount')::int
""", nativeQuery = true)
boolean reportTaskSuccess(@Param("jobId") UUID jobId,
@Param("count") int count);
@Modifying
@Transactional
@Query(value = """
UPDATE job
SET result = jsonb_set(
jsonb_set(
result,
'{failedCount}',
to_jsonb((result->>'failedCount')::int + 1)
),
ARRAY['failures', :taskKey],
to_jsonb(:error)
)
WHERE id = :jobId
RETURNING ((result->>'failedCount')::int + 1) + (result->>'successfulCount')::int
= (result->>'totalCount')::int
""", nativeQuery = true)
boolean reportTaskFailure(@Param("jobId") UUID jobId,
@Param("taskKey") String taskKey,
@Param("error") String error);
boolean existsByKeyAndStatusIn(String key, List<JobStatus> statuses);
boolean existsByTenantIdAndTypeAndStatusIn(UUID tenantId, JobType type, List<JobStatus> statuses);
@Lock(LockModeType.PESSIMISTIC_WRITE) // SELECT FOR UPDATE
@Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId AND j.type = :type " +
"AND j.status = :status ORDER BY j.createdTime ASC, j.id ASC")
JobEntity findOldestByTenantIdAndTypeAndStatusForUpdate(UUID tenantId, JobType type, JobStatus status, Limit limit);
}

10
dao/src/main/java/org/thingsboard/server/dao/sql/task/JpaJobDao.java → dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java

@ -13,10 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.task;
package org.thingsboard.server.dao.sql.job;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Limit;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntityType;
@ -30,7 +31,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.JobEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDao;
import org.thingsboard.server.dao.task.JobDao;
import org.thingsboard.server.dao.job.JobDao;
import org.thingsboard.server.dao.util.SqlDao;
import java.util.Arrays;
@ -63,6 +64,11 @@ public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao
return jobRepository.existsByTenantIdAndTypeAndStatusIn(tenantId.getId(), type, Arrays.stream(statuses).toList());
}
@Override
public Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status) {
return DaoUtil.getData(jobRepository.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId.getId(), type, status, Limit.of(1)));
}
@Override
public EntityType getEntityType() {
return EntityType.JOB;
Loading…
Cancel
Save