|
|
|
@ -17,6 +17,7 @@ package org.thingsboard.server.queue.task; |
|
|
|
|
|
|
|
import jakarta.annotation.PostConstruct; |
|
|
|
import jakarta.annotation.PreDestroy; |
|
|
|
import org.apache.commons.lang3.tuple.Pair; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
@ -25,6 +26,7 @@ import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.common.util.SetCache; |
|
|
|
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.job.JobType; |
|
|
|
import org.thingsboard.server.common.data.job.task.Task; |
|
|
|
import org.thingsboard.server.common.data.job.task.TaskResult; |
|
|
|
@ -42,14 +44,18 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; |
|
|
|
import org.thingsboard.server.queue.settings.TasksQueueConfig; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.CancellationException; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.Executors; |
|
|
|
import java.util.concurrent.Future; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
import java.util.function.Predicate; |
|
|
|
|
|
|
|
public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
|
|
|
|
@ -68,6 +74,8 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer; |
|
|
|
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor")); |
|
|
|
|
|
|
|
protected final Map<Object, Pair<Task<R>, Future<R>>> currentTasks = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private final SetCache<String> discarded = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); |
|
|
|
private final SetCache<String> failed = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); |
|
|
|
|
|
|
|
@ -104,21 +112,24 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
if (event.getEvent() == ComponentLifecycleEvent.STOPPED) { |
|
|
|
log.info("Adding job {} ({}) to discarded", entityId, tasksKey); |
|
|
|
addToDiscarded(tasksKey); |
|
|
|
cancelRunningTasks(tasksKey); |
|
|
|
} else if (event.getEvent() == ComponentLifecycleEvent.FAILED) { |
|
|
|
log.info("Adding job {} ({}) to failed", entityId, tasksKey); |
|
|
|
failed.add(tasksKey); |
|
|
|
cancelRunningTasks(tasksKey); |
|
|
|
} |
|
|
|
} |
|
|
|
case TENANT -> { |
|
|
|
if (event.getEvent() == ComponentLifecycleEvent.DELETED) { |
|
|
|
deletedTenants.add(entityId.getId()); |
|
|
|
log.info("Adding tenant {} to deleted", entityId); |
|
|
|
deletedTenants.add(entityId.getId()); |
|
|
|
cancelRunningTasks((TenantId) entityId); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer, QueueConfig queueConfig) throws Exception { |
|
|
|
private void processMsgs(List<TbProtoQueueMsg<TaskProto>> msgs, TbQueueConsumer<TbProtoQueueMsg<TaskProto>> consumer, Object consumerKey, QueueConfig queueConfig) throws Exception { |
|
|
|
for (TbProtoQueueMsg<TaskProto> msg : msgs) { |
|
|
|
try { |
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
@ -135,7 +146,7 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
processTask(task); |
|
|
|
processTask(task, consumerKey); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
throw e; |
|
|
|
} catch (Exception e) { |
|
|
|
@ -145,30 +156,39 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
consumer.commit(); |
|
|
|
} |
|
|
|
|
|
|
|
private void processTask(T task) throws InterruptedException { |
|
|
|
private void processTask(T task, Object consumerKey) throws InterruptedException { |
|
|
|
task.setAttempt(task.getAttempt() + 1); |
|
|
|
log.debug("Processing task: {}", task); |
|
|
|
Future<R> future = null; |
|
|
|
try { |
|
|
|
long startNs = System.nanoTime(); |
|
|
|
long timeoutMs = getProcessingTimeout(task); |
|
|
|
|
|
|
|
future = taskExecutor.submit(() -> process(task)); |
|
|
|
currentTasks.put(consumerKey, Pair.of(task, future)); |
|
|
|
|
|
|
|
R result; |
|
|
|
try { |
|
|
|
result = future.get(getTaskProcessingTimeout(), TimeUnit.MILLISECONDS); |
|
|
|
result = future.get(timeoutMs, TimeUnit.MILLISECONDS); |
|
|
|
} catch (ExecutionException e) { |
|
|
|
throw e.getCause(); |
|
|
|
} catch (TimeoutException e) { |
|
|
|
throw new TimeoutException("Timeout after " + getTaskProcessingTimeout() + " ms"); |
|
|
|
throw new TimeoutException("Timeout after " + timeoutMs + " ms"); |
|
|
|
} |
|
|
|
|
|
|
|
long timingNs = System.nanoTime() - startNs; |
|
|
|
log.info("Processed task in {} ms: {}", timingNs / 1000000.0, task); |
|
|
|
reportTaskResult(task, result); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
throw e; |
|
|
|
} catch (CancellationException e) { |
|
|
|
if (!failed.contains(task.getKey()) && !deletedTenants.contains(task.getTenantId().getId())) { |
|
|
|
reportTaskDiscarded(task); |
|
|
|
} |
|
|
|
} catch (Throwable e) { |
|
|
|
log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e); |
|
|
|
if (task.getAttempt() <= task.getRetries()) { |
|
|
|
processTask(task); |
|
|
|
processTask(task, consumerKey); |
|
|
|
} else { |
|
|
|
reportTaskFailure(task, e); |
|
|
|
} |
|
|
|
@ -176,11 +196,31 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
if (future != null && !future.isDone()) { |
|
|
|
future.cancel(true); |
|
|
|
} |
|
|
|
currentTasks.remove(consumerKey); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public abstract R process(T task) throws Exception; |
|
|
|
|
|
|
|
private void cancelRunningTasks(String tasksKey) { |
|
|
|
cancelRunningTasks(task -> task.getKey().equals(tasksKey)); |
|
|
|
} |
|
|
|
|
|
|
|
private void cancelRunningTasks(TenantId tenantId) { |
|
|
|
cancelRunningTasks(task -> task.getTenantId().equals(tenantId)); |
|
|
|
} |
|
|
|
|
|
|
|
private void cancelRunningTasks(Predicate<Task<R>> filter) { |
|
|
|
currentTasks.values().forEach(entry -> { |
|
|
|
Task<R> task = entry.getKey(); |
|
|
|
Future<R> future = entry.getValue(); |
|
|
|
if (filter.test(task)) { |
|
|
|
log.debug("Cancelling running task {}", task); |
|
|
|
future.cancel(true); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private void reportTaskFailure(T task, Throwable error) { |
|
|
|
R taskResult = task.toFailed(error); |
|
|
|
reportTaskResult(task, taskResult); |
|
|
|
@ -215,7 +255,7 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> { |
|
|
|
taskExecutor.shutdownNow(); |
|
|
|
} |
|
|
|
|
|
|
|
public abstract long getTaskProcessingTimeout(); |
|
|
|
public abstract long getProcessingTimeout(T task); |
|
|
|
|
|
|
|
public abstract JobType getJobType(); |
|
|
|
|
|
|
|
|