committed by
GitHub
104 changed files with 4121 additions and 288 deletions
@ -0,0 +1,111 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import io.swagger.v3.oas.annotations.Parameter; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.security.access.prepost.PreAuthorize; |
|||
import org.springframework.web.bind.annotation.DeleteMapping; |
|||
import org.springframework.web.bind.annotation.GetMapping; |
|||
import org.springframework.web.bind.annotation.PathVariable; |
|||
import org.springframework.web.bind.annotation.PostMapping; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RequestParam; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
import org.thingsboard.rule.engine.api.JobManager; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobFilter; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.dao.job.JobService; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
|
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
|
|||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION; |
|||
|
|||
@RestController |
|||
@TbCoreComponent |
|||
@RequestMapping("/api") |
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class JobController extends BaseController { |
|||
|
|||
private final JobService jobService; |
|||
private final JobManager jobManager; |
|||
|
|||
@GetMapping("/job/{id}") |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
public Job getJobById(@PathVariable UUID id) throws ThingsboardException { |
|||
return jobService.findJobById(getTenantId(), new JobId(id)); |
|||
} |
|||
|
|||
@GetMapping("/jobs") |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
public PageData<Job> getJobs(@Parameter(description = PAGE_SIZE_DESCRIPTION, required = true) |
|||
@RequestParam int pageSize, |
|||
@Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true) |
|||
@RequestParam int page, |
|||
@Parameter(description = "Case-insensitive 'substring' filter based on job's description") |
|||
@RequestParam(required = false) String textSearch, |
|||
@Parameter(description = SORT_PROPERTY_DESCRIPTION) |
|||
@RequestParam(required = false) String sortProperty, |
|||
@Parameter(description = SORT_ORDER_DESCRIPTION) |
|||
@RequestParam(required = false) String sortOrder, |
|||
@RequestParam(required = false) List<JobType> types, |
|||
@RequestParam(required = false) List<JobStatus> statuses, |
|||
@RequestParam(required = false) List<UUID> entities, |
|||
@RequestParam(required = false) Long startTime, |
|||
@RequestParam(required = false) Long endTime) throws ThingsboardException { |
|||
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); |
|||
JobFilter filter = JobFilter.builder() |
|||
.types(types) |
|||
.statuses(statuses) |
|||
.entities(entities) |
|||
.startTime(startTime) |
|||
.endTime(endTime) |
|||
.build(); |
|||
return jobService.findJobsByFilter(getTenantId(), filter, pageLink); |
|||
} |
|||
|
|||
@PostMapping("/job/{id}/cancel") |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
public void cancelJob(@PathVariable UUID id) throws ThingsboardException { |
|||
jobManager.cancelJob(getTenantId(), new JobId(id)); |
|||
} |
|||
|
|||
@PostMapping("/job/{id}/reprocess") |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
public void reprocessJob(@PathVariable UUID id) throws ThingsboardException { |
|||
jobManager.reprocessJob(getTenantId(), new JobId(id)); |
|||
} |
|||
|
|||
@DeleteMapping("/job/{id}") |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") |
|||
public void deleteJob(@PathVariable UUID id) throws ThingsboardException { |
|||
jobService.deleteJob(getTenantId(), new JobId(id)); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.housekeeper.processor; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; |
|||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType; |
|||
import org.thingsboard.server.dao.job.JobService; |
|||
|
|||
@Component |
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class JobsDeletionTaskProcessor extends HousekeeperTaskProcessor<HousekeeperTask> { |
|||
|
|||
private final JobService jobService; |
|||
|
|||
@Override |
|||
public void process(HousekeeperTask task) throws Exception { |
|||
int deletedCount = jobService.deleteJobsByEntityId(task.getTenantId(), task.getEntityId()); |
|||
log.debug("[{}][{}][{}] Deleted {} jobs", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), deletedCount); |
|||
} |
|||
|
|||
@Override |
|||
public HousekeeperTaskType getTaskType() { |
|||
return HousekeeperTaskType.DELETE_JOBS; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,207 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import jakarta.annotation.PreDestroy; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.ObjectUtils; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.common.util.ThingsBoardExecutors; |
|||
import org.thingsboard.rule.engine.api.JobManager; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobResult; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.Task; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.dao.job.JobService; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueCallback; |
|||
import org.thingsboard.server.queue.TbQueueMsgMetadata; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.settings.TasksQueueConfig; |
|||
import org.thingsboard.server.queue.task.JobStatsService; |
|||
import org.thingsboard.server.queue.task.TaskProducerQueueFactory; |
|||
|
|||
import java.util.Arrays; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.function.Function; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Component |
|||
@Slf4j |
|||
public class DefaultJobManager implements JobManager { |
|||
|
|||
private final JobService jobService; |
|||
private final JobStatsService jobStatsService; |
|||
private final PartitionService partitionService; |
|||
private final TasksQueueConfig queueConfig; |
|||
private final Map<JobType, JobProcessor> jobProcessors; |
|||
private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers; |
|||
private final ExecutorService executor; |
|||
|
|||
public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, PartitionService partitionService, |
|||
TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig, |
|||
List<JobProcessor> jobProcessors) { |
|||
this.jobService = jobService; |
|||
this.jobStatsService = jobStatsService; |
|||
this.partitionService = partitionService; |
|||
this.queueConfig = queueConfig; |
|||
this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity())); |
|||
this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer)); |
|||
this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Job> submitJob(Job job) { |
|||
log.debug("Submitting job: {}", job); |
|||
return Futures.submit(() -> jobService.saveJob(job.getTenantId(), job), executor); |
|||
} |
|||
|
|||
@Override |
|||
public void onJobUpdate(Job job) { |
|||
JobStatus status = job.getStatus(); |
|||
switch (status) { |
|||
case PENDING -> { |
|||
executor.execute(() -> { |
|||
try { |
|||
processJob(job); |
|||
} catch (Throwable e) { |
|||
log.error("Failed to process job update: {}", job, e); |
|||
} |
|||
}); |
|||
} |
|||
case COMPLETED, FAILED -> { |
|||
executor.execute(() -> { |
|||
try { |
|||
getJobProcessor(job.getType()).onJobFinished(job); |
|||
} catch (Throwable e) { |
|||
log.error("Failed to process job update: {}", job, e); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void processJob(Job job) { |
|||
TenantId tenantId = job.getTenantId(); |
|||
JobId jobId = job.getId(); |
|||
try { |
|||
JobProcessor processor = getJobProcessor(job.getType()); |
|||
List<TaskResult> toReprocess = job.getConfiguration().getToReprocess(); |
|||
if (toReprocess == null) { |
|||
int tasksCount = processor.process(job, this::submitTask); |
|||
log.info("[{}][{}][{}] Submitted {} tasks", tenantId, jobId, job.getType(), tasksCount); |
|||
jobStatsService.reportAllTasksSubmitted(tenantId, jobId, tasksCount); |
|||
} else { |
|||
processor.reprocess(job, toReprocess, this::submitTask); |
|||
log.info("[{}][{}][{}] Submitted {} tasks for reprocessing", tenantId, jobId, job.getType(), toReprocess.size()); |
|||
} |
|||
} catch (Throwable e) { |
|||
log.error("[{}][{}][{}] Failed to submit tasks", tenantId, jobId, job.getType(), e); |
|||
jobService.markAsFailed(tenantId, jobId, e.getMessage()); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void cancelJob(TenantId tenantId, JobId jobId) { |
|||
log.info("[{}][{}] Cancelling job", tenantId, jobId); |
|||
jobService.cancelJob(tenantId, jobId); |
|||
} |
|||
|
|||
@Override |
|||
public void reprocessJob(TenantId tenantId, JobId jobId) { |
|||
log.info("[{}][{}] Reprocessing job", tenantId, jobId); |
|||
Job job = jobService.findJobById(tenantId, jobId); |
|||
if (job.getStatus() != JobStatus.FAILED) { |
|||
throw new IllegalArgumentException("Job is not failed"); |
|||
} |
|||
|
|||
JobResult result = job.getResult(); |
|||
if (result.getGeneralError() != null) { |
|||
job.presetResult(); |
|||
} else { |
|||
List<TaskResult> taskFailures = result.getResults().stream() |
|||
.filter(taskResult -> !taskResult.isSuccess() && !taskResult.isDiscarded()) |
|||
.toList(); |
|||
if (result.getFailedCount() > taskFailures.size()) { |
|||
throw new IllegalArgumentException("Reprocessing not allowed since there are too many failures (more than " + taskFailures.size() + ")"); |
|||
} |
|||
result.setFailedCount(0); |
|||
result.setResults(result.getResults().stream() |
|||
.filter(TaskResult::isSuccess) |
|||
.toList()); |
|||
job.getConfiguration().setToReprocess(taskFailures); |
|||
} |
|||
job.getConfiguration().setTasksKey(UUID.randomUUID().toString()); |
|||
jobService.saveJob(tenantId, job); |
|||
} |
|||
|
|||
private void submitTask(Task<?> task) { |
|||
if (ObjectUtils.anyNull(task.getTenantId(), task.getJobId(), task.getKey())) { |
|||
throw new IllegalArgumentException("Task " + task + " missing required fields"); |
|||
} |
|||
|
|||
log.debug("[{}][{}] Submitting task: {}", task.getTenantId(), task.getJobId(), task); |
|||
TaskProto taskProto = TaskProto.newBuilder() |
|||
.setValue(JacksonUtil.toString(task)) |
|||
.build(); |
|||
|
|||
TbQueueProducer<TbProtoQueueMsg<TaskProto>> producer = taskProducers.get(task.getJobType()); |
|||
EntityId entityId = null; |
|||
if (queueConfig.getPartitioningStrategy().equals("entity")) { |
|||
entityId = task.getEntityId(); |
|||
} |
|||
if (entityId == null) { |
|||
entityId = task.getTenantId(); |
|||
} |
|||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TASK_PROCESSOR, task.getJobType().name(), task.getTenantId(), entityId); |
|||
producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), taskProto), new TbQueueCallback() { |
|||
@Override |
|||
public void onSuccess(TbQueueMsgMetadata metadata) { |
|||
log.trace("Submitted task to {}: {}", tpi, taskProto); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.warn("Failed to submit task: {}", task, t); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private JobProcessor getJobProcessor(JobType jobType) { |
|||
return jobProcessors.get(jobType); |
|||
} |
|||
|
|||
@PreDestroy |
|||
private void destroy() { |
|||
executor.shutdownNow(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,93 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.job.DummyJobConfiguration; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.DummyTask; |
|||
import org.thingsboard.server.common.data.job.task.DummyTaskResult; |
|||
import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure; |
|||
import org.thingsboard.server.common.data.job.task.Task; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.function.Consumer; |
|||
|
|||
@Component |
|||
@RequiredArgsConstructor |
|||
public class DummyJobProcessor implements JobProcessor { |
|||
|
|||
@Override |
|||
public int process(Job job, Consumer<Task<?>> taskConsumer) throws Exception { |
|||
DummyJobConfiguration configuration = job.getConfiguration(); |
|||
if (configuration.getGeneralError() != null) { |
|||
for (int number = 1; number <= configuration.getSubmittedTasksBeforeGeneralError(); number++) { |
|||
taskConsumer.accept(createTask(job, configuration, number, null, false)); |
|||
} |
|||
Thread.sleep(configuration.getTaskProcessingTimeMs() * (configuration.getSubmittedTasksBeforeGeneralError() / 2)); // sleeping so that some tasks are processed
|
|||
throw new RuntimeException(configuration.getGeneralError()); |
|||
} |
|||
|
|||
int taskNumber = 1; |
|||
for (int i = 0; i < configuration.getSuccessfulTasksCount(); i++) { |
|||
taskConsumer.accept(createTask(job, configuration, taskNumber, null, false)); |
|||
taskNumber++; |
|||
} |
|||
if (configuration.getErrors() != null) { |
|||
for (int i = 0; i < configuration.getFailedTasksCount(); i++) { |
|||
taskConsumer.accept(createTask(job, configuration, taskNumber, configuration.getErrors(), false)); |
|||
taskNumber++; |
|||
} |
|||
for (int i = 0; i < configuration.getPermanentlyFailedTasksCount(); i++) { |
|||
taskConsumer.accept(createTask(job, configuration, taskNumber, configuration.getErrors(), true)); |
|||
taskNumber++; |
|||
} |
|||
} |
|||
return configuration.getSuccessfulTasksCount() + configuration.getFailedTasksCount() + configuration.getPermanentlyFailedTasksCount(); |
|||
} |
|||
|
|||
@Override |
|||
public void reprocess(Job job, List<TaskResult> taskFailures, Consumer<Task<?>> taskConsumer) throws Exception { |
|||
for (TaskResult taskFailure : taskFailures) { |
|||
DummyTaskFailure failure = ((DummyTaskResult) taskFailure).getFailure(); |
|||
taskConsumer.accept(createTask(job, job.getConfiguration(), failure.getNumber(), failure.isFailAlways() ? |
|||
List.of(failure.getError()) : Collections.emptyList(), failure.isFailAlways())); |
|||
} |
|||
} |
|||
|
|||
private DummyTask createTask(Job job, DummyJobConfiguration configuration, int number, List<String> errors, boolean failAlways) { |
|||
return DummyTask.builder() |
|||
.tenantId(job.getTenantId()) |
|||
.jobId(job.getId()) |
|||
.key(configuration.getTasksKey()) |
|||
.retries(configuration.getRetries()) |
|||
.number(number) |
|||
.processingTimeMs(configuration.getTaskProcessingTimeMs()) |
|||
.errors(errors) |
|||
.failAlways(failAlways) |
|||
.build(); |
|||
} |
|||
|
|||
@Override |
|||
public JobType getType() { |
|||
return JobType.DUMMY; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,36 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.Task; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
|
|||
import java.util.List; |
|||
import java.util.function.Consumer; |
|||
|
|||
public interface JobProcessor { |
|||
|
|||
int process(Job job, Consumer<Task<?>> taskConsumer) throws Exception; |
|||
|
|||
void reprocess(Job job, List<TaskResult> taskFailures, Consumer<Task<?>> taskConsumer) throws Exception; |
|||
|
|||
default void onJobFinished(Job job) {} |
|||
|
|||
JobType getType(); |
|||
|
|||
} |
|||
@ -0,0 +1,115 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import jakarta.annotation.PreDestroy; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.JobStats; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
import org.thingsboard.server.dao.job.JobService; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; |
|||
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; |
|||
import org.thingsboard.server.queue.settings.TasksQueueConfig; |
|||
import org.thingsboard.server.queue.util.AfterStartUp; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
|
|||
@TbCoreComponent |
|||
@Component |
|||
@Slf4j |
|||
public class JobStatsProcessor { |
|||
|
|||
private final JobService jobService; |
|||
private final TasksQueueConfig queueConfig; |
|||
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer; |
|||
private final ExecutorService consumerExecutor; |
|||
|
|||
public JobStatsProcessor(JobService jobService, |
|||
TasksQueueConfig queueConfig, |
|||
TbCoreQueueFactory queueFactory) { |
|||
this.jobService = jobService; |
|||
this.queueConfig = queueConfig; |
|||
this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer")); |
|||
this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder() |
|||
.name("job-stats") |
|||
.msgPackProcessor(this::processStats) |
|||
.pollInterval(queueConfig.getStatsPollInterval()) |
|||
.consumerCreator(queueFactory::createJobStatsConsumer) |
|||
.consumerExecutor(consumerExecutor) |
|||
.build(); |
|||
} |
|||
|
|||
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) |
|||
public void afterStartUp() { |
|||
jobStatsConsumer.subscribe(); |
|||
jobStatsConsumer.launch(); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
private void processStats(List<TbProtoQueueMsg<JobStatsMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> consumer) { |
|||
Map<JobId, JobStats> stats = new HashMap<>(); |
|||
|
|||
for (TbProtoQueueMsg<JobStatsMsg> msg : msgs) { |
|||
JobStatsMsg statsMsg = msg.getValue(); |
|||
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); |
|||
JobId jobId = new JobId(new UUID(statsMsg.getJobIdMSB(), statsMsg.getJobIdLSB())); |
|||
JobStats jobStats = stats.computeIfAbsent(jobId, __ -> new JobStats(tenantId, jobId)); |
|||
|
|||
if (statsMsg.hasTaskResult()) { |
|||
TaskResult taskResult = JacksonUtil.fromString(statsMsg.getTaskResult().getValue(), TaskResult.class); |
|||
jobStats.getTaskResults().add(taskResult); |
|||
} |
|||
if (statsMsg.hasTotalTasksCount()) { |
|||
jobStats.setTotalTasksCount(statsMsg.getTotalTasksCount()); |
|||
} |
|||
} |
|||
|
|||
stats.forEach((jobId, jobStats) -> { |
|||
TenantId tenantId = jobStats.getTenantId(); |
|||
try { |
|||
log.debug("[{}][{}] Processing job stats: {}", tenantId, jobId, stats); |
|||
jobService.processStats(tenantId, jobId, jobStats); |
|||
} catch (Exception e) { |
|||
log.error("[{}][{}] Failed to process job stats: {}", tenantId, jobId, jobStats, e); |
|||
} |
|||
}); |
|||
consumer.commit(); |
|||
|
|||
Thread.sleep(queueConfig.getStatsProcessingInterval()); |
|||
} |
|||
|
|||
@PreDestroy |
|||
private void destroy() { |
|||
jobStatsConsumer.stop(); |
|||
consumerExecutor.shutdownNow(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job.task; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.DummyTask; |
|||
import org.thingsboard.server.common.data.job.task.DummyTaskResult; |
|||
import org.thingsboard.server.queue.task.TaskProcessor; |
|||
|
|||
@RequiredArgsConstructor |
|||
public class DummyTaskProcessor extends TaskProcessor<DummyTask, DummyTaskResult> { |
|||
|
|||
@Override |
|||
public DummyTaskResult process(DummyTask task) throws Exception { |
|||
if (task.getProcessingTimeMs() > 0) { |
|||
Thread.sleep(task.getProcessingTimeMs()); |
|||
} |
|||
if (task.isFailAlways()) { |
|||
throw new RuntimeException(task.getErrors().get(0)); |
|||
} |
|||
if (task.getErrors() != null && task.getAttempt() <= task.getErrors().size()) { |
|||
String error = task.getErrors().get(task.getAttempt() - 1); |
|||
throw new RuntimeException(error); |
|||
} |
|||
return DummyTaskResult.success(task); |
|||
} |
|||
|
|||
@Override |
|||
public long getTaskProcessingTimeout() { |
|||
return 2000; |
|||
} |
|||
|
|||
@Override |
|||
public JobType getJobType() { |
|||
return JobType.DUMMY; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,92 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.telemetry; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import com.google.common.util.concurrent.SettableFuture; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.kv.Aggregation; |
|||
import org.thingsboard.server.common.data.kv.AggregationParams; |
|||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
|||
import org.thingsboard.server.common.data.kv.IntervalType; |
|||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|||
import org.thingsboard.server.service.security.AccessValidator; |
|||
import org.thingsboard.server.service.security.ValidationResult; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.permission.Operation; |
|||
|
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
@RequiredArgsConstructor |
|||
public class DefaultTbTelemetryService implements TbTelemetryService { |
|||
|
|||
private final TimeseriesService tsService; |
|||
private final AccessValidator accessValidator; |
|||
|
|||
@Override |
|||
public ListenableFuture<List<TsKvEntry>> getTimeseries(EntityId entityId, List<String> keys, Long startTs, Long endTs, IntervalType intervalType, |
|||
Long interval, String timeZone, Integer limit, Aggregation agg, String orderBy, |
|||
Boolean useStrictDataTypes, SecurityUser currentUser) { |
|||
SettableFuture<List<TsKvEntry>> future = SettableFuture.create(); |
|||
accessValidator.validate(currentUser, Operation.READ_TELEMETRY, entityId, new FutureCallback<>() { |
|||
@Override |
|||
public void onSuccess(ValidationResult validationResult) { |
|||
try { |
|||
AggregationParams params; |
|||
if (Aggregation.NONE.equals(agg)) { |
|||
params = AggregationParams.none(); |
|||
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) { |
|||
params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval); |
|||
} else { |
|||
params = AggregationParams.calendar(agg, intervalType, timeZone); |
|||
} |
|||
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList()); |
|||
Futures.addCallback(tsService.findAll(currentUser.getTenantId(), entityId, queries), new FutureCallback<>() { |
|||
@Override |
|||
public void onSuccess(List<TsKvEntry> result) { |
|||
future.set(result); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
future.setException(t); |
|||
} |
|||
}, MoreExecutors.directExecutor()); |
|||
} catch (Throwable e) { |
|||
onFailure(e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
future.setException(t); |
|||
} |
|||
}); |
|||
return future; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.telemetry; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.kv.Aggregation; |
|||
import org.thingsboard.server.common.data.kv.IntervalType; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface TbTelemetryService { |
|||
|
|||
ListenableFuture<List<TsKvEntry>> getTimeseries(EntityId entityId, |
|||
List<String> keys, |
|||
Long startTs, |
|||
Long endTs, |
|||
IntervalType intervalType, |
|||
Long interval, |
|||
String timeZone, |
|||
Integer limit, |
|||
Aggregation agg, |
|||
String orderBy, |
|||
Boolean useStrictDataTypes, |
|||
SecurityUser currentUser) throws ThingsboardException; |
|||
|
|||
} |
|||
@ -0,0 +1,478 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import lombok.SneakyThrows; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.boot.test.mock.mockito.SpyBean; |
|||
import org.springframework.test.context.TestPropertySource; |
|||
import org.thingsboard.rule.engine.api.JobManager; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.DummyJobConfiguration; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobFilter; |
|||
import org.thingsboard.server.common.data.job.JobResult; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.DummyTaskResult; |
|||
import org.thingsboard.server.common.data.job.task.DummyTaskResult.DummyTaskFailure; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.controller.AbstractControllerTest; |
|||
import org.thingsboard.server.dao.job.JobDao; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
import org.thingsboard.server.queue.task.JobStatsService; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Comparator; |
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.Mockito.doAnswer; |
|||
import static org.mockito.Mockito.never; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
|
|||
@DaoSqlTest |
|||
@TestPropertySource(properties = { |
|||
"queue.tasks.stats.processing_interval=0" |
|||
}) |
|||
public class JobManagerTest extends AbstractControllerTest { |
|||
|
|||
@Autowired |
|||
private JobManager jobManager; |
|||
|
|||
@SpyBean |
|||
private TestTaskProcessor taskProcessor; |
|||
|
|||
@SpyBean |
|||
private JobStatsService jobStatsService; |
|||
|
|||
@Autowired |
|||
private JobDao jobDao; |
|||
|
|||
private TenantId tenantId; |
|||
private Device jobEntity; |
|||
|
|||
@Before |
|||
public void setUp() throws Exception { |
|||
loginTenantAdmin(); |
|||
tenantId = super.tenantId; |
|||
jobEntity = createDevice("Test", "Test"); |
|||
} |
|||
|
|||
@After |
|||
public void tearDown() throws Exception { |
|||
} |
|||
|
|||
@Test |
|||
public void testSubmitJob_allTasksSuccessful() { |
|||
int tasksCount = 5; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(tasksCount) |
|||
.taskProcessingTimeMs(1000) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.RUNNING); |
|||
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); |
|||
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); |
|||
}); |
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount); |
|||
assertThat(job.getResult().getResults()).isEmpty(); |
|||
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testSubmitJob_someTasksPermanentlyFailed() { |
|||
int successfulTasks = 3; |
|||
int failedTasks = 2; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(successfulTasks) |
|||
.failedTasksCount(failedTasks) |
|||
.errors(List.of("error1", "error2", "error3")) |
|||
.retries(2) |
|||
.taskProcessingTimeMs(100) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
JobResult jobResult = job.getResult(); |
|||
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); |
|||
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); |
|||
assertThat(jobResult.getTotalCount()).isEqualTo(successfulTasks + failedTasks); |
|||
assertThat(getFailures(jobResult)).hasSize(2).allSatisfy(failure -> { |
|||
assertThat(failure.getError()).isEqualTo("error3"); // last error
|
|||
}); |
|||
assertThat(jobResult.getCompletedCount()).isEqualTo(jobResult.getTotalCount()); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testSubmitJob_taskTimeout() { |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(1) |
|||
.taskProcessingTimeMs(5000) // bigger than DummyTaskProcessor.getTaskProcessingTimeout()
|
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
JobResult jobResult = job.getResult(); |
|||
assertThat(jobResult.getFailedCount()).isEqualTo(1); |
|||
assertThat(((DummyTaskResult) jobResult.getResults().get(0)).getFailure().getError()).isEqualTo("Timeout after 2000 ms"); // last error
|
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testCancelJob_whileRunning() throws Exception { |
|||
int tasksCount = 100; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(tasksCount) |
|||
.taskProcessingTimeMs(100) |
|||
.build()).getId(); |
|||
|
|||
Thread.sleep(500); |
|||
cancelJob(jobId); |
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); |
|||
assertThat(job.getResult().getDiscardedCount()).isBetween(1, tasksCount - 1); |
|||
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); |
|||
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testCancelJob_simulateTaskProcessorRestart() throws Exception { |
|||
int tasksCount = 10; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(tasksCount) |
|||
.taskProcessingTimeMs(500) |
|||
.build()).getId(); |
|||
|
|||
// simulate cancelled jobs are forgotten
|
|||
AtomicInteger cancellationRenotifyAttempt = new AtomicInteger(0); |
|||
doAnswer(inv -> { |
|||
if (cancellationRenotifyAttempt.incrementAndGet() >= 5) { |
|||
inv.callRealMethod(); |
|||
} |
|||
return null; |
|||
}).when(taskProcessor).addToDiscarded(any()); // ignoring cancellation event,
|
|||
cancelJob(jobId); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, tasksCount - 1); |
|||
assertThat(job.getResult().getDiscardedCount()).isBetween(1, tasksCount - 1); |
|||
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); |
|||
assertThat(job.getResult().getCompletedCount()).isEqualTo(tasksCount); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void whenTenantIsDeleted_thenCancelAllTheJobs() throws Exception { |
|||
loginSysAdmin(); |
|||
createDifferentTenant(); |
|||
|
|||
this.tenantId = this.differentTenantId; |
|||
submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(1000) |
|||
.taskProcessingTimeMs(500) |
|||
.build()); |
|||
|
|||
Thread.sleep(2000); |
|||
deleteDifferentTenant(); |
|||
Mockito.reset(jobStatsService); |
|||
|
|||
Thread.sleep(3000); |
|||
verify(jobStatsService, never()).reportTaskResult(any(), any(), any()); |
|||
assertThat(jobDao.findByTenantIdAndFilter(tenantId, JobFilter.builder().build(), new PageLink(100)).getData()).isEmpty(); |
|||
} |
|||
|
|||
@Test |
|||
public void testSubmitMultipleJobs() throws Exception { |
|||
int tasksCount = 3; |
|||
int jobsCount = 3; |
|||
for (int i = 1; i <= jobsCount; i++) { |
|||
submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(tasksCount) |
|||
.taskProcessingTimeMs(1000) |
|||
.build(), "test-job-" + i); |
|||
} |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
List<Job> jobs = findJobs(List.of(JobType.DUMMY), List.of(jobEntity.getUuidId())); |
|||
assertThat(jobs).hasSize(jobsCount); |
|||
Job firstJob = jobs.get(2); // ordered by createdTime descending
|
|||
assertThat(firstJob.getStatus()).isEqualTo(JobStatus.RUNNING); |
|||
Job secondJob = jobs.get(1); |
|||
assertThat(secondJob.getStatus()).isEqualTo(JobStatus.QUEUED); |
|||
Job thirdJob = jobs.get(0); |
|||
assertThat(thirdJob.getStatus()).isEqualTo(JobStatus.QUEUED); |
|||
}); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
List<Job> jobs = findJobs(); |
|||
for (Job job : jobs) { |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(tasksCount); |
|||
assertThat(job.getResult().getTotalCount()).isEqualTo(tasksCount); |
|||
assertThat(job.getEntityId()).isEqualTo(jobEntity.getId()); |
|||
assertThat(job.getEntityName()).isEqualTo(jobEntity.getName()); |
|||
} |
|||
}); |
|||
|
|||
doDelete("/api/device/" + jobEntity.getId()).andExpect(status().isOk()); |
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
assertThat(findJobs(List.of(JobType.DUMMY), List.of(jobEntity.getUuidId()))).isEmpty(); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testCancelQueuedJob() throws Exception { |
|||
int tasksCount = 3; |
|||
int jobsCount = 3; |
|||
List<JobId> jobIds = new ArrayList<>(); |
|||
for (int i = 1; i <= jobsCount; i++) { |
|||
Job job = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(tasksCount) |
|||
.taskProcessingTimeMs(1000) |
|||
.build(), "test-job-" + i); |
|||
jobIds.add(job.getId()); |
|||
} |
|||
|
|||
for (int i = 1; i < jobIds.size(); i++) { |
|||
cancelJob(jobIds.get(i)); |
|||
} |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
List<Job> jobs = findJobs(); |
|||
|
|||
Job firstJob = jobs.get(2); |
|||
assertThat(firstJob.getStatus()).isEqualTo(JobStatus.COMPLETED); |
|||
assertThat(firstJob.getResult().getSuccessfulCount()).isEqualTo(tasksCount); |
|||
assertThat(firstJob.getResult().getTotalCount()).isEqualTo(tasksCount); |
|||
|
|||
Job secondJob = jobs.get(1); |
|||
assertThat(secondJob.getStatus()).isEqualTo(JobStatus.CANCELLED); |
|||
assertThat(secondJob.getResult().getCompletedCount()).isZero(); |
|||
|
|||
Job thirdJob = jobs.get(0); |
|||
assertThat(thirdJob.getStatus()).isEqualTo(JobStatus.CANCELLED); |
|||
assertThat(thirdJob.getResult().getCompletedCount()).isZero(); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testSubmitJob_generalError() { |
|||
int submittedTasks = 100; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.generalError("Some error while submitting tasks") |
|||
.submittedTasksBeforeGeneralError(submittedTasks) |
|||
.taskProcessingTimeMs(10) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isBetween(1, submittedTasks); |
|||
assertThat(job.getResult().getDiscardedCount()).isZero(); |
|||
assertThat(job.getResult().getTotalCount()).isNull(); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testSubmitJob_immediateGeneralError() { |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.generalError("Some error while submitting tasks") |
|||
.submittedTasksBeforeGeneralError(0) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isZero(); |
|||
assertThat(job.getResult().getDiscardedCount()).isZero(); |
|||
assertThat(job.getResult().getFailedCount()).isZero(); |
|||
assertThat(job.getResult().getTotalCount()).isNull(); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testReprocessJob_generalError() throws Exception { |
|||
int submittedTasks = 100; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.generalError("Some error while submitting tasks") |
|||
.submittedTasksBeforeGeneralError(submittedTasks) |
|||
.taskProcessingTimeMs(10) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
assertThat(job.getResult().getGeneralError()).isEqualTo("Some error while submitting tasks"); |
|||
}); |
|||
|
|||
Job savedJob = jobDao.findById(tenantId, jobId.getId()); |
|||
DummyJobConfiguration configuration = savedJob.getConfiguration(); |
|||
configuration.setGeneralError(null); |
|||
configuration.setSuccessfulTasksCount(submittedTasks); |
|||
jobDao.save(tenantId, savedJob); |
|||
|
|||
reprocessJob(jobId); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); |
|||
assertThat(job.getResult().getGeneralError()).isNull(); |
|||
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(submittedTasks); |
|||
assertThat(job.getResult().getTotalCount()).isEqualTo(submittedTasks); |
|||
assertThat(job.getResult().getFailedCount()).isZero(); |
|||
assertThat(job.getResult().getDiscardedCount()).isZero(); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testReprocessJob() throws Exception { |
|||
int successfulTasks = 3; |
|||
int failedTasks = 2; |
|||
int totalTasksCount = successfulTasks + failedTasks; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(successfulTasks) |
|||
.failedTasksCount(failedTasks) |
|||
.errors(List.of("error")) |
|||
.taskProcessingTimeMs(100) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
JobResult jobResult = job.getResult(); |
|||
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); |
|||
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks); |
|||
|
|||
List<DummyTaskFailure> failures = getFailures(jobResult); |
|||
for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { |
|||
DummyTaskFailure failure = failures.get(i); |
|||
assertThat(failure.getNumber()).isEqualTo(taskNumber); |
|||
assertThat(failure.getError()).isEqualTo("error"); |
|||
} |
|||
}); |
|||
|
|||
reprocessJob(jobId); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.COMPLETED); |
|||
assertThat(job.getResult().getSuccessfulCount()).isEqualTo(totalTasksCount); |
|||
assertThat(job.getResult().getFailedCount()).isZero(); |
|||
assertThat(job.getResult().getTotalCount()).isEqualTo(totalTasksCount); |
|||
assertThat(job.getResult().getResults()).isEmpty(); |
|||
assertThat(job.getConfiguration().getToReprocess()).isNullOrEmpty(); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testReprocessJob_somePermanentlyFailed() throws Exception { |
|||
int successfulTasks = 3; |
|||
int failedTasks = 2; |
|||
int permanentlyFailedTasks = 1; |
|||
int totalTasksCount = successfulTasks + failedTasks + permanentlyFailedTasks; |
|||
JobId jobId = submitJob(DummyJobConfiguration.builder() |
|||
.successfulTasksCount(successfulTasks) |
|||
.failedTasksCount(failedTasks) |
|||
.permanentlyFailedTasksCount(permanentlyFailedTasks) |
|||
.errors(List.of("error")) |
|||
.taskProcessingTimeMs(100) |
|||
.build()).getId(); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
JobResult jobResult = job.getResult(); |
|||
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks); |
|||
assertThat(jobResult.getFailedCount()).isEqualTo(failedTasks + permanentlyFailedTasks); |
|||
assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); |
|||
|
|||
List<DummyTaskFailure> failures = getFailures(jobResult); |
|||
for (int i = 0, taskNumber = successfulTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { |
|||
DummyTaskFailure failure = failures.get(i); |
|||
assertThat(failure.getNumber()).isEqualTo(taskNumber); |
|||
assertThat(failure.getError()).isEqualTo("error"); |
|||
} |
|||
}); |
|||
|
|||
reprocessJob(jobId); |
|||
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
Job job = findJobById(jobId); |
|||
assertThat(job.getStatus()).isEqualTo(JobStatus.FAILED); |
|||
JobResult jobResult = job.getResult(); |
|||
assertThat(jobResult.getSuccessfulCount()).isEqualTo(successfulTasks + failedTasks); |
|||
assertThat(jobResult.getFailedCount()).isEqualTo(permanentlyFailedTasks); |
|||
assertThat(jobResult.getTotalCount()).isEqualTo(totalTasksCount); |
|||
|
|||
List<DummyTaskFailure> failures = getFailures(jobResult); |
|||
for (int i = 0, taskNumber = successfulTasks + failedTasks + 1; taskNumber <= totalTasksCount; i++, taskNumber++) { |
|||
DummyTaskFailure failure = failures.get(i); |
|||
assertThat(failure.getNumber()).isEqualTo(taskNumber); |
|||
assertThat(failure.getError()).isEqualTo("error"); |
|||
assertThat(failure.isFailAlways()).isTrue(); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private Job submitJob(DummyJobConfiguration configuration) { |
|||
return submitJob(configuration, "test-job"); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
private Job submitJob(DummyJobConfiguration configuration, String key) { |
|||
return jobManager.submitJob(Job.builder() |
|||
.tenantId(tenantId) |
|||
.type(JobType.DUMMY) |
|||
.key(key) |
|||
.entityId(jobEntity.getId()) |
|||
.configuration(configuration) |
|||
.build()).get(); |
|||
} |
|||
|
|||
private List<DummyTaskFailure> getFailures(JobResult jobResult) { |
|||
return jobResult.getResults().stream() |
|||
.map(taskResult -> ((DummyTaskResult) taskResult).getFailure()) |
|||
.sorted(Comparator.comparingInt(DummyTaskFailure::getNumber)) |
|||
.toList(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import org.springframework.test.context.TestPropertySource; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
@DaoSqlTest |
|||
@TestPropertySource(properties = { |
|||
"queue.tasks.stats.processing_interval=0", |
|||
"queue.tasks.partitioning_strategy=entity", |
|||
"queue.tasks.partitions_per_type=DUMMY:100;DUMMY:50" |
|||
}) |
|||
public class JobManagerTest_EntityPartitioningStrategy extends JobManagerTest { |
|||
|
|||
/* |
|||
* Some tests are overridden because they are based on |
|||
* tenant partitioning strategy (subsequent tasks processing within a tenant) |
|||
* */ |
|||
|
|||
@Override |
|||
public void testCancelJob_simulateTaskProcessorRestart() throws Exception { |
|||
} |
|||
|
|||
@Override |
|||
public void testSubmitJob_generalError() { |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.job; |
|||
|
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.service.job.task.DummyTaskProcessor; |
|||
|
|||
@Component |
|||
public class TestTaskProcessor extends DummyTaskProcessor { |
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.job; |
|||
|
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobFilter; |
|||
import org.thingsboard.server.common.data.job.JobStats; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.dao.entity.EntityDaoService; |
|||
|
|||
public interface JobService extends EntityDaoService { |
|||
|
|||
Job saveJob(TenantId tenantId, Job job); |
|||
|
|||
Job findJobById(TenantId tenantId, JobId jobId); |
|||
|
|||
void cancelJob(TenantId tenantId, JobId jobId); |
|||
|
|||
void markAsFailed(TenantId tenantId, JobId jobId, String error); |
|||
|
|||
void processStats(TenantId tenantId, JobId jobId, JobStats jobStats); |
|||
|
|||
PageData<Job> findJobsByFilter(TenantId tenantId, JobFilter filter, PageLink pageLink); |
|||
|
|||
Job findLatestJobByKey(TenantId tenantId, String key); |
|||
|
|||
void deleteJob(TenantId tenantId, JobId jobId); |
|||
|
|||
int deleteJobsByEntityId(TenantId tenantId, EntityId entityId); |
|||
|
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.id; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonCreator; |
|||
import com.fasterxml.jackson.annotation.JsonProperty; |
|||
import io.swagger.v3.oas.annotations.media.Schema; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
public class JobId extends UUIDBased implements EntityId { |
|||
|
|||
@JsonCreator |
|||
public JobId(@JsonProperty("id") UUID id) { |
|||
super(id); |
|||
} |
|||
|
|||
@Schema(requiredMode = Schema.RequiredMode.REQUIRED, description = "string", example = "JOB", allowableValues = "JOB") |
|||
@Override |
|||
public EntityType getEntityType() { |
|||
return EntityType.JOB; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.ToString; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@AllArgsConstructor |
|||
@NoArgsConstructor |
|||
@Builder |
|||
@ToString(callSuper = true) |
|||
public class DummyJobConfiguration extends JobConfiguration { |
|||
|
|||
private long taskProcessingTimeMs; |
|||
private int successfulTasksCount; |
|||
private int failedTasksCount; |
|||
private int permanentlyFailedTasksCount; |
|||
private List<String> errors; |
|||
private int retries; |
|||
|
|||
private String generalError; |
|||
private int submittedTasksBeforeGeneralError; |
|||
|
|||
@Override |
|||
public JobType getType() { |
|||
return JobType.DUMMY; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
public class DummyJobResult extends JobResult { |
|||
|
|||
@Override |
|||
public JobType getJobType() { |
|||
return JobType.DUMMY; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,85 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
import jakarta.validation.Valid; |
|||
import jakarta.validation.constraints.NotBlank; |
|||
import jakarta.validation.constraints.NotNull; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.ToString; |
|||
import org.thingsboard.server.common.data.BaseData; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.HasTenantId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
|
|||
@Data |
|||
@NoArgsConstructor |
|||
@ToString(callSuper = true) |
|||
@EqualsAndHashCode(callSuper = true) |
|||
public class Job extends BaseData<JobId> implements HasTenantId { |
|||
|
|||
@NotNull |
|||
private TenantId tenantId; |
|||
@NotNull |
|||
private JobType type; |
|||
@NotBlank |
|||
private String key; |
|||
@NotNull |
|||
private EntityId entityId; |
|||
private String entityName; // read-only
|
|||
@NotNull |
|||
private JobStatus status; |
|||
@NotNull |
|||
@Valid |
|||
private JobConfiguration configuration; |
|||
@NotNull |
|||
private JobResult result; |
|||
|
|||
public static final Set<EntityType> SUPPORTED_ENTITY_TYPES = Set.of( |
|||
EntityType.DEVICE, EntityType.ASSET, EntityType.DEVICE_PROFILE, EntityType.ASSET_PROFILE |
|||
); |
|||
|
|||
@Builder(toBuilder = true) |
|||
public Job(TenantId tenantId, JobType type, String key, EntityId entityId, JobConfiguration configuration) { |
|||
this.tenantId = tenantId; |
|||
this.type = type; |
|||
this.key = key; |
|||
this.entityId = entityId; |
|||
this.configuration = configuration; |
|||
this.configuration.setTasksKey(UUID.randomUUID().toString()); |
|||
presetResult(); |
|||
} |
|||
|
|||
public void presetResult() { |
|||
this.result = switch (type) { |
|||
case DUMMY -> new DummyJobResult(); |
|||
}; |
|||
} |
|||
|
|||
@SuppressWarnings("unchecked") |
|||
public <C extends JobConfiguration> C getConfiguration() { |
|||
return (C) configuration; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore; |
|||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; |
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
|||
import jakarta.validation.constraints.NotBlank; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.List; |
|||
|
|||
@JsonIgnoreProperties(ignoreUnknown = true) |
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") |
|||
@JsonSubTypes({ |
|||
@Type(name = "DUMMY", value = DummyJobConfiguration.class), |
|||
}) |
|||
@Data |
|||
public abstract class JobConfiguration implements Serializable { |
|||
|
|||
@NotBlank |
|||
private String tasksKey; // internal
|
|||
private List<TaskResult> toReprocess; // internal
|
|||
|
|||
@JsonIgnore |
|||
public abstract JobType getType(); |
|||
|
|||
} |
|||
@ -0,0 +1,72 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore; |
|||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; |
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
@JsonIgnoreProperties(ignoreUnknown = true) |
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") |
|||
@JsonSubTypes({ |
|||
@Type(name = "DUMMY", value = DummyJobResult.class) |
|||
}) |
|||
@Data |
|||
@NoArgsConstructor |
|||
public abstract class JobResult implements Serializable { |
|||
|
|||
private int successfulCount; |
|||
private int failedCount; |
|||
private int discardedCount; |
|||
private Integer totalCount = null; // set when all tasks are submitted
|
|||
private List<TaskResult> results = new ArrayList<>(); |
|||
private String generalError; |
|||
|
|||
private long startTs; |
|||
private long finishTs; |
|||
private long cancellationTs; |
|||
|
|||
@JsonIgnore |
|||
public int getCompletedCount() { |
|||
return successfulCount + failedCount + discardedCount; |
|||
} |
|||
|
|||
public void processTaskResult(TaskResult taskResult) { |
|||
if (taskResult.isSuccess()) { |
|||
successfulCount++; |
|||
} else if (taskResult.isDiscarded()) { |
|||
discardedCount++; |
|||
} else { |
|||
failedCount++; |
|||
if (results.size() < 100) { // preserving only first 100 errors, not reprocessing if there are more failures
|
|||
results.add(taskResult); |
|||
} |
|||
} |
|||
} |
|||
|
|||
@JsonIgnore |
|||
public abstract JobType getJobType(); |
|||
|
|||
} |
|||
@ -0,0 +1,39 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
public enum JobStatus { |
|||
|
|||
QUEUED, |
|||
PENDING, |
|||
RUNNING, |
|||
COMPLETED, |
|||
FAILED, |
|||
CANCELLED; |
|||
|
|||
public boolean isOneOf(JobStatus... statuses) { |
|||
if (statuses == null) { |
|||
return false; |
|||
} |
|||
for (JobStatus status : statuses) { |
|||
if (this == status) { |
|||
return true; |
|||
} |
|||
} |
|||
return false; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.RequiredArgsConstructor; |
|||
|
|||
@RequiredArgsConstructor |
|||
@Getter |
|||
public enum JobType { |
|||
|
|||
DUMMY("Dummy job"); |
|||
|
|||
private final String title; |
|||
|
|||
public String getTasksTopic() { |
|||
return "tasks." + name().toLowerCase(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job.task; |
|||
|
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.ToString; |
|||
import lombok.experimental.SuperBuilder; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
|
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
|
|||
@Data |
|||
@NoArgsConstructor |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@SuperBuilder |
|||
@ToString(callSuper = true) |
|||
public class DummyTask extends Task<DummyTaskResult> { |
|||
|
|||
private int number; |
|||
private long processingTimeMs; |
|||
private List<String> errors; // errors for each attempt
|
|||
private boolean failAlways; |
|||
|
|||
@Override |
|||
public DummyTaskResult toFailed(Throwable error) { |
|||
return DummyTaskResult.failed(this, error); |
|||
} |
|||
|
|||
@Override |
|||
public DummyTaskResult toDiscarded() { |
|||
return DummyTaskResult.discarded(this); |
|||
} |
|||
|
|||
@Override |
|||
public EntityId getEntityId() { |
|||
return new DeviceId(UUID.randomUUID()); |
|||
} |
|||
|
|||
@Override |
|||
public JobType getJobType() { |
|||
return JobType.DUMMY; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,75 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job.task; |
|||
|
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.ToString; |
|||
import lombok.experimental.SuperBuilder; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
|
|||
@Data |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@NoArgsConstructor |
|||
@SuperBuilder |
|||
@ToString(callSuper = true) |
|||
public class DummyTaskResult extends TaskResult { |
|||
|
|||
private DummyTaskFailure failure; |
|||
|
|||
public static DummyTaskResult success(DummyTask task) { |
|||
return DummyTaskResult.builder() |
|||
.key(task.getKey()) |
|||
.success(true) |
|||
.build(); |
|||
} |
|||
|
|||
public static DummyTaskResult failed(DummyTask task, Throwable error) { |
|||
return DummyTaskResult.builder() |
|||
.key(task.getKey()) |
|||
.failure(DummyTaskFailure.builder() |
|||
.error(error.getMessage()) |
|||
.number(task.getNumber()) |
|||
.failAlways(task.isFailAlways()) |
|||
.build()) |
|||
.build(); |
|||
} |
|||
|
|||
public static DummyTaskResult discarded(DummyTask task) { |
|||
return DummyTaskResult.builder() |
|||
.key(task.getKey()) |
|||
.discarded(true) |
|||
.build(); |
|||
} |
|||
|
|||
@Override |
|||
public JobType getJobType() { |
|||
return JobType.DUMMY; |
|||
} |
|||
|
|||
@Data |
|||
@NoArgsConstructor |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@SuperBuilder |
|||
public static class DummyTaskFailure extends TaskFailure { |
|||
|
|||
private int number; |
|||
private boolean failAlways; |
|||
|
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,61 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job.task; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore; |
|||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; |
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.experimental.SuperBuilder; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
|
|||
@Data |
|||
@JsonIgnoreProperties(ignoreUnknown = true) |
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") |
|||
@JsonSubTypes({ |
|||
@Type(name = "DUMMY", value = DummyTask.class) |
|||
}) |
|||
@SuperBuilder |
|||
@AllArgsConstructor |
|||
public abstract class Task<R extends TaskResult> { |
|||
|
|||
private TenantId tenantId; |
|||
private JobId jobId; |
|||
private String key; |
|||
private int retries; |
|||
|
|||
public Task() { |
|||
} |
|||
|
|||
private int attempt = 0; |
|||
|
|||
public abstract R toFailed(Throwable error); |
|||
|
|||
public abstract R toDiscarded(); |
|||
|
|||
@JsonIgnore |
|||
public abstract EntityId getEntityId(); |
|||
|
|||
@JsonIgnore |
|||
public abstract JobType getJobType(); |
|||
|
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job.task; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.experimental.SuperBuilder; |
|||
|
|||
@Data |
|||
@AllArgsConstructor |
|||
@NoArgsConstructor |
|||
@SuperBuilder |
|||
public abstract class TaskFailure { |
|||
|
|||
private String error; |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.job.task; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore; |
|||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes; |
|||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; |
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.experimental.SuperBuilder; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
|
|||
@Data |
|||
@AllArgsConstructor |
|||
@NoArgsConstructor |
|||
@SuperBuilder |
|||
@JsonIgnoreProperties(ignoreUnknown = true) |
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType") |
|||
@JsonSubTypes({ |
|||
@Type(name = "DUMMY", value = DummyTaskResult.class) |
|||
}) |
|||
public abstract class TaskResult { |
|||
|
|||
private String key; |
|||
private boolean success; |
|||
private boolean discarded; |
|||
|
|||
@JsonIgnore |
|||
public abstract JobType getJobType(); |
|||
|
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.settings; |
|||
|
|||
import lombok.Getter; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Getter |
|||
@Component |
|||
public class TasksQueueConfig { |
|||
|
|||
@Value("${queue.tasks.poll_interval:500}") |
|||
private int pollInterval; |
|||
|
|||
@Value("${queue.tasks.partitioning_strategy:tenant}") |
|||
private String partitioningStrategy; |
|||
|
|||
@Value("${queue.tasks.stats.topic:jobs.stats}") |
|||
private String statsTopic; |
|||
|
|||
@Value("${queue.tasks.stats.poll_interval:500}") |
|||
private int statsPollInterval; |
|||
|
|||
@Value("${queue.tasks.stats.processing_interval:1000}") |
|||
private int statsProcessingInterval; |
|||
|
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|||
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; |
|||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; |
|||
import org.thingsboard.server.queue.settings.TasksQueueConfig; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='in-memory'") |
|||
@RequiredArgsConstructor |
|||
public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFactory { |
|||
|
|||
private final InMemoryStorage storage; |
|||
private final TasksQueueConfig tasksQueueConfig; |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) { |
|||
return new InMemoryTbQueueConsumer<>(storage, jobType.getTasksTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() { |
|||
return new InMemoryTbQueueProducer<>(storage, tasksQueueConfig.getStatsTopic()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|||
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}' == 'in-memory'") |
|||
@RequiredArgsConstructor |
|||
public class InMemoryTaskProducerQueueFactory implements TaskProducerQueueFactory { |
|||
|
|||
private final InMemoryStorage storage; |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType) { |
|||
return new InMemoryTbQueueProducer<>(storage, jobType.getTasksTopic()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,67 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskResultProto; |
|||
import org.thingsboard.server.queue.TbQueueCallback; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
|
|||
@Lazy |
|||
@Service |
|||
@Slf4j |
|||
public class JobStatsService { |
|||
|
|||
private final TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> producer; |
|||
|
|||
public JobStatsService(TaskProcessorQueueFactory queueFactory) { |
|||
this.producer = queueFactory.createJobStatsProducer(); |
|||
} |
|||
|
|||
public void reportTaskResult(TenantId tenantId, JobId jobId, TaskResult result) { |
|||
report(tenantId, jobId, JobStatsMsg.newBuilder() |
|||
.setTaskResult(TaskResultProto.newBuilder() |
|||
.setValue(JacksonUtil.toString(result)) |
|||
.build())); |
|||
} |
|||
|
|||
public void reportAllTasksSubmitted(TenantId tenantId, JobId jobId, int tasksCount) { |
|||
report(tenantId, jobId, JobStatsMsg.newBuilder() |
|||
.setTotalTasksCount(tasksCount)); |
|||
} |
|||
|
|||
private void report(TenantId tenantId, JobId jobId, JobStatsMsg.Builder statsMsg) { |
|||
log.debug("[{}][{}] Reporting: {}", tenantId, jobId, statsMsg); |
|||
statsMsg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
|||
.setJobIdMSB(jobId.getId().getMostSignificantBits()) |
|||
.setJobIdLSB(jobId.getId().getLeastSignificantBits()); |
|||
|
|||
// using job id as msg key so that all stats for a certain job are submitted to the same partition
|
|||
TbProtoQueueMsg<JobStatsMsg> msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build()); |
|||
producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, TbQueueCallback.EMPTY); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,86 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.discovery.TopicService; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
|||
import org.thingsboard.server.queue.settings.TasksQueueConfig; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='kafka'") |
|||
public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory { |
|||
|
|||
private final TopicService topicService; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final TasksQueueConfig tasksQueueConfig; |
|||
private final TbKafkaSettings kafkaSettings; |
|||
private final TbKafkaConsumerStatsService consumerStatsService; |
|||
|
|||
private final TbQueueAdmin tasksAdmin; |
|||
|
|||
public KafkaTaskProcessorQueueFactory(TopicService topicService, |
|||
TbServiceInfoProvider serviceInfoProvider, |
|||
TasksQueueConfig tasksQueueConfig, |
|||
TbKafkaSettings kafkaSettings, |
|||
TbKafkaConsumerStatsService consumerStatsService, |
|||
TbKafkaTopicConfigs kafkaTopicConfigs) { |
|||
this.topicService = topicService; |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.tasksQueueConfig = tasksQueueConfig; |
|||
this.kafkaSettings = kafkaSettings; |
|||
this.consumerStatsService = consumerStatsService; |
|||
this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) { |
|||
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TaskProto>>builder() |
|||
.settings(kafkaSettings) |
|||
.topic(topicService.buildTopicName(jobType.getTasksTopic())) |
|||
.clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId()) |
|||
.groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group")) |
|||
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders())) |
|||
.admin(tasksAdmin) |
|||
.statsService(consumerStatsService) |
|||
.build(); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() { |
|||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder() |
|||
.clientId("job-stats-producer-" + serviceInfoProvider.getServiceId()) |
|||
.defaultTopic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic())) |
|||
.settings(kafkaSettings) |
|||
.admin(tasksAdmin) |
|||
.build(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.discovery.TopicService; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaAdmin; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
|||
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}' == 'kafka' && ('${service.type:null}' == 'monolith' || " + |
|||
"'${service.type:null}' == 'tb-core' || '${service.type:null}' == 'tb-rule-engine')") |
|||
public class KafkaTaskProducerQueueFactory implements TaskProducerQueueFactory { |
|||
|
|||
private final TopicService topicService; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final TbKafkaSettings kafkaSettings; |
|||
private final TbQueueAdmin tasksAdmin; |
|||
|
|||
KafkaTaskProducerQueueFactory(TopicService topicService, |
|||
TbServiceInfoProvider serviceInfoProvider, |
|||
TbKafkaSettings kafkaSettings, |
|||
TbKafkaTopicConfigs kafkaTopicConfigs) { |
|||
this.topicService = topicService; |
|||
this.kafkaSettings = kafkaSettings; |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType) { |
|||
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TaskProto>>builder() |
|||
.clientId(jobType.name().toLowerCase() + "-task-producer-" + serviceInfoProvider.getServiceId()) |
|||
.defaultTopic(topicService.buildTopicName(jobType.getTasksTopic())) |
|||
.settings(kafkaSettings) |
|||
.admin(tasksAdmin) |
|||
.build(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,222 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.event.EventListener; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.common.util.SetCache; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.Task; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|||
import org.thingsboard.server.common.data.queue.QueueConfig; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; |
|||
import org.thingsboard.server.queue.discovery.QueueKey; |
|||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; |
|||
import org.thingsboard.server.queue.settings.TasksQueueConfig; |
|||
|
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.Future; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.TimeoutException; |
|||
|
|||
public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|||
|
|||
protected final Logger log = LoggerFactory.getLogger(getClass()); |
|||
|
|||
@Autowired |
|||
private TaskProcessorQueueFactory queueFactory; |
|||
@Autowired |
|||
private JobStatsService statsService; |
|||
@Autowired |
|||
private TaskProcessorExecutors executors; |
|||
@Autowired |
|||
private TasksQueueConfig config; |
|||
|
|||
private QueueKey queueKey; |
|||
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer; |
|||
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor")); |
|||
|
|||
private final SetCache<String> discarded = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); |
|||
private final SetCache<String> failed = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); |
|||
|
|||
private final SetCache<UUID> deletedTenants = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, getJobType().name()); |
|||
taskConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<TaskProto>, QueueConfig>builder() |
|||
.queueKey(queueKey) |
|||
.config(QueueConfig.of(true, config.getPollInterval())) |
|||
.msgPackProcessor(this::processMsgs) |
|||
.consumerCreator((queueConfig, tpi) -> queueFactory.createTaskConsumer(getJobType())) |
|||
.consumerExecutor(executors.getConsumersExecutor()) |
|||
.scheduler(executors.getScheduler()) |
|||
.taskExecutor(executors.getMgmtExecutor()) |
|||
.build(); |
|||
} |
|||
|
|||
@EventListener |
|||
public void onPartitionChangeEvent(PartitionChangeEvent event) { |
|||
if (event.getServiceType() == ServiceType.TASK_PROCESSOR) { |
|||
Set<TopicPartitionInfo> partitions = event.getNewPartitions().get(queueKey); |
|||
taskConsumer.update(partitions); |
|||
} |
|||
} |
|||
|
|||
@EventListener |
|||
public void onComponentLifecycle(ComponentLifecycleMsg event) { |
|||
EntityId entityId = event.getEntityId(); |
|||
switch (entityId.getEntityType()) { |
|||
case JOB -> { |
|||
String tasksKey = event.getInfo().get("tasksKey").asText(); |
|||
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) { |
|||
log.info("Adding job {} ({}) to discarded", entityId, tasksKey); |
|||
addToDiscarded(tasksKey); |
|||
} else if (event.getEvent() == ComponentLifecycleEvent.FAILED) { |
|||
log.info("Adding job {} ({}) to failed", entityId, tasksKey); |
|||
failed.add(tasksKey); |
|||
} |
|||
} |
|||
case TENANT -> { |
|||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) { |
|||
deletedTenants.add(entityId.getId()); |
|||
log.info("Adding tenant {} to deleted", entityId); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer, QueueConfig queueConfig) throws Exception { |
|||
for (TbProtoQueueMsg<TaskProto> msg : msgs) { |
|||
try { |
|||
@SuppressWarnings("unchecked") |
|||
T task = (T) JacksonUtil.fromString(msg.getValue().getValue(), Task.class); |
|||
if (discarded.contains(task.getKey())) { |
|||
log.debug("Skipping task for discarded job {}: {}", task.getJobId(), task); |
|||
reportTaskDiscarded(task); |
|||
continue; |
|||
} else if (failed.contains(task.getKey())) { |
|||
log.debug("Skipping task for failed job {}: {}", task.getJobId(), task); |
|||
continue; |
|||
} else if (deletedTenants.contains(task.getTenantId().getId())) { |
|||
log.debug("Skipping task for deleted tenant {}: {}", task.getTenantId(), task); |
|||
continue; |
|||
} |
|||
|
|||
processTask(task); |
|||
} catch (InterruptedException e) { |
|||
throw e; |
|||
} catch (Exception e) { |
|||
log.error("Failed to process msg: {}", msg, e); |
|||
} |
|||
} |
|||
consumer.commit(); |
|||
} |
|||
|
|||
private void processTask(T task) throws InterruptedException { |
|||
task.setAttempt(task.getAttempt() + 1); |
|||
log.debug("Processing task: {}", task); |
|||
Future<R> future = null; |
|||
try { |
|||
long startNs = System.nanoTime(); |
|||
future = taskExecutor.submit(() -> process(task)); |
|||
R result; |
|||
try { |
|||
result = future.get(getTaskProcessingTimeout(), TimeUnit.MILLISECONDS); |
|||
} catch (ExecutionException e) { |
|||
throw e.getCause(); |
|||
} catch (TimeoutException e) { |
|||
throw new TimeoutException("Timeout after " + getTaskProcessingTimeout() + " ms"); |
|||
} |
|||
long timingNs = System.nanoTime() - startNs; |
|||
log.info("Processed task in {} ms: {}", timingNs / 1000000.0, task); |
|||
reportTaskResult(task, result); |
|||
} catch (InterruptedException e) { |
|||
throw e; |
|||
} catch (Throwable e) { |
|||
log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e); |
|||
if (task.getAttempt() <= task.getRetries()) { |
|||
processTask(task); |
|||
} else { |
|||
reportTaskFailure(task, e); |
|||
} |
|||
} finally { |
|||
if (future != null && !future.isDone()) { |
|||
future.cancel(true); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public abstract R process(T task) throws Exception; |
|||
|
|||
private void reportTaskFailure(T task, Throwable error) { |
|||
R taskResult = task.toFailed(error); |
|||
reportTaskResult(task, taskResult); |
|||
} |
|||
|
|||
private void reportTaskDiscarded(T task) { |
|||
R taskResult = task.toDiscarded(); |
|||
reportTaskResult(task, taskResult); |
|||
} |
|||
|
|||
private void reportTaskResult(T task, R result) { |
|||
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result); |
|||
} |
|||
|
|||
public void addToDiscarded(String tasksKey) { |
|||
discarded.add(tasksKey); |
|||
} |
|||
|
|||
protected <V> V wait(Future<V> future) throws Exception { |
|||
try { |
|||
return future.get(); // will be interrupted after task processing timeout
|
|||
} catch (InterruptedException e) { |
|||
future.cancel(true); // interrupting the underlying task
|
|||
throw e; |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void destroy() { |
|||
taskConsumer.stop(); |
|||
taskConsumer.awaitStop(); |
|||
taskExecutor.shutdownNow(); |
|||
} |
|||
|
|||
public abstract long getTaskProcessingTimeout(); |
|||
|
|||
public abstract JobType getJobType(); |
|||
|
|||
} |
|||
@ -0,0 +1,59 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import lombok.Getter; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.common.util.ThingsBoardExecutors; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
|
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
|
|||
@Getter |
|||
@Lazy |
|||
@Component |
|||
public class TaskProcessorExecutors { |
|||
|
|||
private ExecutorService consumersExecutor; |
|||
private ExecutorService mgmtExecutor; |
|||
private ScheduledExecutorService scheduler; |
|||
|
|||
@PostConstruct |
|||
private void init() { |
|||
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("task-consumer")); |
|||
mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "task-consumer-mgmt"); |
|||
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("task-consumer-scheduler"); |
|||
} |
|||
|
|||
@PreDestroy |
|||
private void destroy() { |
|||
if (consumersExecutor != null) { |
|||
consumersExecutor.shutdownNow(); |
|||
} |
|||
if (mgmtExecutor != null) { |
|||
mgmtExecutor.shutdownNow(); |
|||
} |
|||
if (scheduler != null) { |
|||
scheduler.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
|
|||
public interface TaskProcessorQueueFactory { |
|||
|
|||
TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType); |
|||
|
|||
TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer(); |
|||
|
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.task; |
|||
|
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
|
|||
public interface TaskProducerQueueFactory { |
|||
|
|||
TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType); |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.common.util; |
|||
|
|||
import com.github.benmanes.caffeine.cache.Cache; |
|||
import com.github.benmanes.caffeine.cache.Caffeine; |
|||
|
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
public class SetCache<K> { |
|||
|
|||
private static final Object DUMMY_VALUE = Boolean.TRUE; |
|||
|
|||
private final Cache<K, Object> cache; |
|||
|
|||
public SetCache(long valueTtlMs) { |
|||
this.cache = Caffeine.newBuilder() |
|||
.expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS) |
|||
.build(); |
|||
} |
|||
|
|||
public void add(K key) { |
|||
cache.put(key, DUMMY_VALUE); |
|||
} |
|||
|
|||
public boolean contains(K key) { |
|||
return cache.asMap().containsKey(key); |
|||
} |
|||
|
|||
public void remove(K key) { |
|||
cache.invalidate(key); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,257 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.job; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
import org.thingsboard.server.common.data.EntityInfo; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.HasId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobFilter; |
|||
import org.thingsboard.server.common.data.job.JobResult; |
|||
import org.thingsboard.server.common.data.job.JobStats; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.job.task.TaskResult; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.dao.entity.AbstractEntityService; |
|||
import org.thingsboard.server.dao.entity.EntityService; |
|||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; |
|||
import org.thingsboard.server.dao.service.ConstraintValidator; |
|||
|
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
import java.util.Set; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static org.thingsboard.server.common.data.job.JobStatus.CANCELLED; |
|||
import static org.thingsboard.server.common.data.job.JobStatus.COMPLETED; |
|||
import static org.thingsboard.server.common.data.job.JobStatus.FAILED; |
|||
import static org.thingsboard.server.common.data.job.JobStatus.PENDING; |
|||
import static org.thingsboard.server.common.data.job.JobStatus.QUEUED; |
|||
import static org.thingsboard.server.common.data.job.JobStatus.RUNNING; |
|||
|
|||
@Service |
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class DefaultJobService extends AbstractEntityService implements JobService { |
|||
|
|||
private final JobDao jobDao; |
|||
private final EntityService entityService; |
|||
|
|||
@Transactional |
|||
@Override |
|||
public Job saveJob(TenantId tenantId, Job job) { |
|||
if (jobDao.existsByTenantAndKeyAndStatusOneOf(tenantId, job.getKey(), QUEUED, PENDING, RUNNING)) { |
|||
throw new IllegalArgumentException("The same job is already queued or running"); |
|||
} |
|||
if (jobDao.existsByTenantIdAndTypeAndStatusOneOf(tenantId, job.getType(), PENDING, RUNNING)) { |
|||
job.setStatus(QUEUED); |
|||
} else { |
|||
job.setStatus(PENDING); |
|||
job.getResult().setStartTs(System.currentTimeMillis()); |
|||
} |
|||
return saveJob(tenantId, job, true, null); |
|||
} |
|||
|
|||
@Override |
|||
public Job findJobById(TenantId tenantId, JobId jobId) { |
|||
return jobDao.findById(tenantId, jobId.getId()); |
|||
} |
|||
|
|||
@Transactional |
|||
@Override |
|||
public void cancelJob(TenantId tenantId, JobId jobId) { |
|||
Job job = findForUpdate(tenantId, jobId); |
|||
if (!job.getStatus().isOneOf(QUEUED, PENDING, RUNNING)) { |
|||
throw new IllegalArgumentException("Job already " + job.getStatus().name().toLowerCase()); |
|||
} |
|||
job.getResult().setCancellationTs(System.currentTimeMillis()); |
|||
JobStatus prevStatus = job.getStatus(); |
|||
if (job.getStatus() == QUEUED) { |
|||
job.setStatus(CANCELLED); // setting cancelled status right away, because we don't expect stats for cancelled tasks
|
|||
} else if (job.getStatus() == PENDING) { |
|||
job.setStatus(RUNNING); |
|||
} |
|||
saveJob(tenantId, job, true, prevStatus); |
|||
} |
|||
|
|||
@Transactional |
|||
@Override |
|||
public void markAsFailed(TenantId tenantId, JobId jobId, String error) { |
|||
Job job = findForUpdate(tenantId, jobId); |
|||
job.getResult().setGeneralError(error); |
|||
JobStatus prevStatus = job.getStatus(); |
|||
job.setStatus(FAILED); |
|||
saveJob(tenantId, job, true, prevStatus); |
|||
} |
|||
|
|||
@Transactional |
|||
@Override |
|||
public void processStats(TenantId tenantId, JobId jobId, JobStats jobStats) { |
|||
Job job = findForUpdate(tenantId, jobId); |
|||
if (job == null) { |
|||
log.debug("[{}][{}] Got stale stats: {}", tenantId, jobId, jobStats); |
|||
return; |
|||
} |
|||
JobStatus prevStatus = job.getStatus(); |
|||
if (job.getStatus() == PENDING) { |
|||
job.setStatus(RUNNING); |
|||
} |
|||
|
|||
JobResult result = job.getResult(); |
|||
if (jobStats.getTotalTasksCount() != null) { |
|||
result.setTotalCount(jobStats.getTotalTasksCount()); |
|||
} |
|||
|
|||
boolean publishEvent = false; |
|||
for (TaskResult taskResult : jobStats.getTaskResults()) { |
|||
if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) { |
|||
log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey()); |
|||
continue; |
|||
} |
|||
|
|||
result.processTaskResult(taskResult); |
|||
|
|||
if (result.getCancellationTs() > 0) { |
|||
if (!taskResult.isDiscarded() && System.currentTimeMillis() > result.getCancellationTs()) { |
|||
log.info("Got task result for cancelled job {}: {}, re-notifying processors about cancellation", jobId, taskResult); |
|||
// task processor forgot the task is cancelled
|
|||
publishEvent = true; |
|||
} |
|||
} |
|||
} |
|||
|
|||
if (job.getStatus() == RUNNING) { |
|||
if (result.getTotalCount() != null && result.getCompletedCount() >= result.getTotalCount()) { |
|||
if (result.getCancellationTs() > 0) { |
|||
job.setStatus(CANCELLED); |
|||
} else if (result.getFailedCount() > 0) { |
|||
job.setStatus(FAILED); |
|||
publishEvent = true; |
|||
} else { |
|||
job.setStatus(COMPLETED); |
|||
publishEvent = true; |
|||
} |
|||
result.setFinishTs(System.currentTimeMillis()); |
|||
job.getConfiguration().setToReprocess(null); |
|||
} |
|||
} |
|||
|
|||
saveJob(tenantId, job, publishEvent, prevStatus); |
|||
} |
|||
|
|||
private Job saveJob(TenantId tenantId, Job job, boolean publishEvent, JobStatus prevStatus) { |
|||
ConstraintValidator.validateFields(job); |
|||
if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) { |
|||
throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType()); |
|||
} |
|||
|
|||
job = jobDao.save(tenantId, job); |
|||
if (publishEvent) { |
|||
eventPublisher.publishEvent(SaveEntityEvent.builder() |
|||
.tenantId(tenantId) |
|||
.entityId(job.getId()) |
|||
.entity(job) |
|||
.build()); |
|||
} |
|||
log.info("[{}] Saved job: {}", tenantId, job); |
|||
if (prevStatus != null && job.getStatus() != prevStatus) { |
|||
log.info("[{}][{}][{}] New job status: {} -> {}", tenantId, job.getId(), job.getType(), prevStatus, job.getStatus()); |
|||
if (job.getStatus().isOneOf(CANCELLED, COMPLETED, FAILED) && prevStatus != QUEUED) { // if prev status is QUEUED - means there are already running jobs with this type, no need to check for waiting job
|
|||
checkWaitingJobs(tenantId, job.getType()); |
|||
} |
|||
} |
|||
return job; |
|||
} |
|||
|
|||
private void checkWaitingJobs(TenantId tenantId, JobType jobType) { |
|||
Job queuedJob = jobDao.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId, jobType, QUEUED); |
|||
if (queuedJob == null) { |
|||
return; |
|||
} |
|||
queuedJob.setStatus(PENDING); |
|||
saveJob(tenantId, queuedJob, true, QUEUED); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Job> findJobsByFilter(TenantId tenantId, JobFilter filter, PageLink pageLink) { |
|||
PageData<Job> jobs = jobDao.findByTenantIdAndFilter(tenantId, filter, pageLink); |
|||
|
|||
Set<EntityId> entityIds = jobs.getData().stream() |
|||
.map(Job::getEntityId) |
|||
.collect(Collectors.toSet()); |
|||
Map<EntityId, EntityInfo> entityInfos = entityService.fetchEntityInfos(tenantId, null, entityIds); |
|||
jobs.getData().forEach(job -> { |
|||
EntityInfo entityInfo = entityInfos.get(job.getEntityId()); |
|||
if (entityInfo != null) { |
|||
job.setEntityName(entityInfo.getName()); |
|||
} |
|||
}); |
|||
return jobs; |
|||
} |
|||
|
|||
@Override |
|||
public Job findLatestJobByKey(TenantId tenantId, String key) { |
|||
return jobDao.findLatestByTenantIdAndKey(tenantId, key); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteJob(TenantId tenantId, JobId jobId) { |
|||
Job job = findJobById(tenantId, jobId); |
|||
if (!job.getStatus().isOneOf(CANCELLED, COMPLETED, FAILED)) { |
|||
throw new IllegalArgumentException("Job must be cancelled, completed or failed"); |
|||
} |
|||
jobDao.removeById(tenantId, jobId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public int deleteJobsByEntityId(TenantId tenantId, EntityId entityId) { // TODO: cancel all jobs for this entity
|
|||
return jobDao.removeByEntityId(tenantId, entityId); |
|||
} |
|||
|
|||
private Job findForUpdate(TenantId tenantId, JobId jobId) { |
|||
return jobDao.findByIdForUpdate(tenantId, jobId); |
|||
} |
|||
|
|||
@Override |
|||
public Optional<HasId<?>> findEntity(TenantId tenantId, EntityId entityId) { |
|||
return Optional.ofNullable(findJobById(tenantId, (JobId) entityId)); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteEntity(TenantId tenantId, EntityId id, boolean force) { |
|||
jobDao.removeById(tenantId, id.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteByTenantId(TenantId tenantId) { |
|||
jobDao.removeByTenantId(tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public EntityType getEntityType() { |
|||
return EntityType.JOB; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.job; |
|||
|
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobFilter; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.dao.Dao; |
|||
|
|||
public interface JobDao extends Dao<Job> { |
|||
|
|||
PageData<Job> findByTenantIdAndFilter(TenantId tenantId, JobFilter filter, PageLink pageLink); |
|||
|
|||
Job findByIdForUpdate(TenantId tenantId, JobId jobId); |
|||
|
|||
Job findLatestByTenantIdAndKey(TenantId tenantId, String key); |
|||
|
|||
boolean existsByTenantAndKeyAndStatusOneOf(TenantId tenantId, String key, JobStatus... statuses); |
|||
|
|||
boolean existsByTenantIdAndTypeAndStatusOneOf(TenantId tenantId, JobType type, JobStatus... statuses); |
|||
|
|||
boolean existsByTenantIdAndEntityIdAndStatusOneOf(TenantId tenantId, EntityId entityId, JobStatus... statuses); |
|||
|
|||
Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status); |
|||
|
|||
void removeByTenantId(TenantId tenantId); |
|||
|
|||
int removeByEntityId(TenantId tenantId, EntityId entityId); |
|||
|
|||
} |
|||
@ -0,0 +1,105 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.model.sql; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import jakarta.persistence.Column; |
|||
import jakarta.persistence.Convert; |
|||
import jakarta.persistence.Entity; |
|||
import jakarta.persistence.EnumType; |
|||
import jakarta.persistence.Enumerated; |
|||
import jakarta.persistence.Table; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobConfiguration; |
|||
import org.thingsboard.server.common.data.job.JobResult; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.dao.model.BaseSqlEntity; |
|||
import org.thingsboard.server.dao.model.ModelConstants; |
|||
import org.thingsboard.server.dao.util.mapping.JsonConverter; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Data |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@NoArgsConstructor |
|||
@Entity |
|||
@Table(name = ModelConstants.JOB_TABLE_NAME) |
|||
public class JobEntity extends BaseSqlEntity<Job> { |
|||
|
|||
@Column(name = ModelConstants.TENANT_ID_PROPERTY, nullable = false) |
|||
private UUID tenantId; |
|||
|
|||
@Enumerated(EnumType.STRING) |
|||
@Column(name = ModelConstants.JOB_TYPE_PROPERTY, nullable = false) |
|||
private JobType type; |
|||
|
|||
@Column(name = ModelConstants.JOB_KEY_PROPERTY, nullable = false) |
|||
private String key; |
|||
|
|||
@Column(name = ModelConstants.JOB_ENTITY_ID_PROPERTY, nullable = false) |
|||
private UUID entityId; |
|||
|
|||
@Enumerated(EnumType.STRING) |
|||
@Column(name = ModelConstants.JOB_ENTITY_TYPE_PROPERTY, nullable = false) |
|||
private EntityType entityType; |
|||
|
|||
@Enumerated(EnumType.STRING) |
|||
@Column(name = ModelConstants.JOB_STATUS_PROPERTY, nullable = false) |
|||
private JobStatus status; |
|||
|
|||
@Convert(converter = JsonConverter.class) |
|||
@Column(name = ModelConstants.JOB_CONFIGURATION_PROPERTY, nullable = false) |
|||
private JsonNode configuration; |
|||
|
|||
@Convert(converter = JsonConverter.class) |
|||
@Column(name = ModelConstants.JOB_RESULT_PROPERTY) |
|||
private JsonNode result; |
|||
|
|||
public JobEntity(Job job) { |
|||
super(job); |
|||
this.tenantId = getTenantUuid(job.getTenantId()); |
|||
this.type = job.getType(); |
|||
this.key = job.getKey(); |
|||
this.entityId = job.getEntityId().getId(); |
|||
this.entityType = job.getEntityId().getEntityType(); |
|||
this.status = job.getStatus(); |
|||
this.configuration = toJson(job.getConfiguration()); |
|||
this.result = toJson(job.getResult()); |
|||
} |
|||
|
|||
@Override |
|||
public Job toData() { |
|||
Job job = new Job(); |
|||
job.setId(new JobId(id)); |
|||
job.setCreatedTime(createdTime); |
|||
job.setTenantId(getTenantId(tenantId)); |
|||
job.setType(type); |
|||
job.setKey(key); |
|||
job.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId)); |
|||
job.setStatus(status); |
|||
job.setConfiguration(fromJson(configuration, JobConfiguration.class)); |
|||
job.setResult(fromJson(result, JobResult.class)); |
|||
return job; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,80 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.job; |
|||
|
|||
import org.springframework.data.domain.Limit; |
|||
import org.springframework.data.domain.Page; |
|||
import org.springframework.data.domain.Pageable; |
|||
import org.springframework.data.jpa.repository.JpaRepository; |
|||
import org.springframework.data.jpa.repository.Modifying; |
|||
import org.springframework.data.jpa.repository.Query; |
|||
import org.springframework.data.repository.query.Param; |
|||
import org.springframework.stereotype.Repository; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.dao.model.sql.JobEntity; |
|||
|
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
|
|||
@Repository |
|||
public interface JobRepository extends JpaRepository<JobEntity, UUID> { |
|||
|
|||
@Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId " + |
|||
"AND (:types IS NULL OR j.type IN (:types)) " + |
|||
"AND (:statuses IS NULL OR j.status IN (:statuses)) " + |
|||
"AND (:entities IS NULL OR j.entityId IN :entities) " + |
|||
"AND (:startTime <= 0 OR j.createdTime >= :startTime) " + |
|||
"AND (:endTime <= 0 OR j.createdTime <= :endTime) " + |
|||
"AND (:searchText IS NULL OR ilike(j.key, concat('%', :searchText, '%')) = true)") |
|||
Page<JobEntity> findByTenantIdAndTypesAndStatusesAndEntitiesAndTimeAndSearchText(@Param("tenantId") UUID tenantId, |
|||
@Param("types") List<JobType> types, |
|||
@Param("statuses") List<JobStatus> statuses, |
|||
@Param("entities") List<UUID> entities, |
|||
@Param("startTime") long startTime, |
|||
@Param("endTime") long endTime, |
|||
@Param("searchText") String searchText, |
|||
Pageable pageable); |
|||
|
|||
@Query(value = "SELECT * FROM job j WHERE j.id = :id FOR UPDATE", nativeQuery = true) |
|||
JobEntity findByIdForUpdate(UUID id); |
|||
|
|||
@Query("SELECT j FROM JobEntity j WHERE j.tenantId = :tenantId AND j.key = :key " + |
|||
"ORDER BY j.createdTime DESC") |
|||
JobEntity findLatestByTenantIdAndKey(@Param("tenantId") UUID tenantId, @Param("key") String key, Limit limit); |
|||
|
|||
boolean existsByTenantIdAndKeyAndStatusIn(UUID tenantId, String key, List<JobStatus> statuses); |
|||
|
|||
boolean existsByTenantIdAndTypeAndStatusIn(UUID tenantId, JobType type, List<JobStatus> statuses); |
|||
|
|||
boolean existsByTenantIdAndEntityIdAndStatusIn(UUID tenantId, UUID entityId, List<JobStatus> statuses); |
|||
|
|||
@Query(value = "SELECT * FROM job j WHERE j.tenant_id = :tenantId AND j.type = :type " + |
|||
"AND j.status = :status ORDER BY j.created_time ASC, j.id ASC LIMIT 1 FOR UPDATE", nativeQuery = true) |
|||
JobEntity findOldestByTenantIdAndTypeAndStatusForUpdate(UUID tenantId, String type, String status); |
|||
|
|||
@Transactional |
|||
@Modifying |
|||
@Query("DELETE FROM JobEntity j WHERE j.tenantId = :tenantId") |
|||
void deleteByTenantId(UUID tenantId); |
|||
|
|||
@Transactional |
|||
@Modifying |
|||
@Query("DELETE FROM JobEntity j WHERE j.entityId = :entityId") |
|||
int deleteByEntityId(UUID entityId); |
|||
|
|||
} |
|||
@ -0,0 +1,116 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.job; |
|||
|
|||
import com.google.common.base.Strings; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.springframework.data.domain.Limit; |
|||
import org.springframework.data.jpa.repository.JpaRepository; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
import org.thingsboard.server.common.data.job.JobFilter; |
|||
import org.thingsboard.server.common.data.job.JobStatus; |
|||
import org.thingsboard.server.common.data.job.JobType; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.util.CollectionsUtil; |
|||
import org.thingsboard.server.dao.DaoUtil; |
|||
import org.thingsboard.server.dao.job.JobDao; |
|||
import org.thingsboard.server.dao.model.sql.JobEntity; |
|||
import org.thingsboard.server.dao.sql.JpaAbstractDao; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
|
|||
import java.util.Arrays; |
|||
import java.util.UUID; |
|||
|
|||
@Component |
|||
@SqlDao |
|||
@RequiredArgsConstructor |
|||
public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao { |
|||
|
|||
private final JobRepository jobRepository; |
|||
|
|||
@Override |
|||
public PageData<Job> findByTenantIdAndFilter(TenantId tenantId, JobFilter filter, PageLink pageLink) { |
|||
return DaoUtil.toPageData(jobRepository.findByTenantIdAndTypesAndStatusesAndEntitiesAndTimeAndSearchText(tenantId.getId(), |
|||
CollectionsUtil.isEmpty(filter.getTypes()) ? null : filter.getTypes(), |
|||
CollectionsUtil.isEmpty(filter.getStatuses()) ? null : filter.getStatuses(), |
|||
CollectionsUtil.isEmpty(filter.getEntities()) ? null : filter.getEntities(), |
|||
filter.getStartTime() != null ? filter.getStartTime() : 0, |
|||
filter.getEndTime() != null ? filter.getEndTime() : 0, |
|||
Strings.emptyToNull(pageLink.getTextSearch()), DaoUtil.toPageable(pageLink))); |
|||
} |
|||
|
|||
@Override |
|||
public Job findByIdForUpdate(TenantId tenantId, JobId jobId) { |
|||
return DaoUtil.getData(jobRepository.findByIdForUpdate(jobId.getId())); |
|||
} |
|||
|
|||
@Override |
|||
public Job findLatestByTenantIdAndKey(TenantId tenantId, String key) { |
|||
return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key, Limit.of(1))); |
|||
} |
|||
|
|||
@Override |
|||
public boolean existsByTenantAndKeyAndStatusOneOf(TenantId tenantId, String key, JobStatus... statuses) { |
|||
return jobRepository.existsByTenantIdAndKeyAndStatusIn(tenantId.getId(), key, Arrays.stream(statuses).toList()); |
|||
} |
|||
|
|||
@Override |
|||
public boolean existsByTenantIdAndTypeAndStatusOneOf(TenantId tenantId, JobType type, JobStatus... statuses) { |
|||
return jobRepository.existsByTenantIdAndTypeAndStatusIn(tenantId.getId(), type, Arrays.stream(statuses).toList()); |
|||
} |
|||
|
|||
@Override |
|||
public boolean existsByTenantIdAndEntityIdAndStatusOneOf(TenantId tenantId, EntityId entityId, JobStatus... statuses) { |
|||
return jobRepository.existsByTenantIdAndEntityIdAndStatusIn(tenantId.getId(), entityId.getId(), Arrays.stream(statuses).toList()); |
|||
} |
|||
|
|||
@Override |
|||
public Job findOldestByTenantIdAndTypeAndStatusForUpdate(TenantId tenantId, JobType type, JobStatus status) { |
|||
return DaoUtil.getData(jobRepository.findOldestByTenantIdAndTypeAndStatusForUpdate(tenantId.getId(), type.name(), status.name())); |
|||
} |
|||
|
|||
@Override |
|||
public void removeByTenantId(TenantId tenantId) { |
|||
jobRepository.deleteByTenantId(tenantId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public int removeByEntityId(TenantId tenantId, EntityId entityId) { |
|||
return jobRepository.deleteByEntityId(entityId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public EntityType getEntityType() { |
|||
return EntityType.JOB; |
|||
} |
|||
|
|||
@Override |
|||
protected Class<JobEntity> getEntityClass() { |
|||
return JobEntity.class; |
|||
} |
|||
|
|||
@Override |
|||
protected JpaRepository<JobEntity, UUID> getRepository() { |
|||
return jobRepository; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.common.data.id.JobId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.job.Job; |
|||
|
|||
public interface JobManager { |
|||
|
|||
ListenableFuture<Job> submitJob(Job job); // TODO: rate limits
|
|||
|
|||
void cancelJob(TenantId tenantId, JobId jobId); |
|||
|
|||
void reprocessJob(TenantId tenantId, JobId jobId); |
|||
|
|||
void onJobUpdate(Job job); |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue