Browse Source

VC request timing out

pull/6689/head
Viacheslav Klimov 4 years ago
parent
commit
2a5c7528ac
  1. 34
      application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java
  2. 4
      application/src/main/java/org/thingsboard/server/service/sync/vc/data/PendingGitRequest.java
  3. 1
      application/src/main/resources/thingsboard.yml

34
application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
@ -34,9 +35,9 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
import org.thingsboard.server.common.data.sync.vc.EntityVersionsDiff;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest;
@ -53,6 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.VersionControlRespon
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest;
@ -67,7 +69,13 @@ import org.thingsboard.server.service.sync.vc.data.PendingGitRequest;
import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest;
import org.thingsboard.server.service.sync.vc.data.VoidGitRequest;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -81,16 +89,22 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final TbClusterService clusterService;
private final DataDecodingEncodingService encodingService;
private final DefaultEntitiesVersionControlService entitiesVersionControlService;
private final SchedulerComponent scheduler;
private final Map<UUID, PendingGitRequest<?>> pendingRequestMap = new HashMap<>();
@Value("${queue.vc.request-timeout:60000}")
private int requestTimeout;
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
DataDecodingEncodingService encodingService,
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService) {
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService,
SchedulerComponent scheduler) {
this.serviceInfoProvider = serviceInfoProvider;
this.clusterService = clusterService;
this.encodingService = encodingService;
this.entitiesVersionControlService = entitiesVersionControlService;
this.scheduler = scheduler;
}
@Override
@ -275,6 +289,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var requestBody = enrichFunction.apply(newRequestProto(request, settings));
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
request.setTimeoutTask(scheduler.schedule(() -> {
processTimeout(request.getRequestId());
}, requestTimeout, TimeUnit.MILLISECONDS));
} else {
throw new RuntimeException("Future is already done!");
}
@ -338,12 +355,13 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public void processResponse(VersionControlResponseMsg vcResponseMsg) {
UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB());
PendingGitRequest<?> request = pendingRequestMap.get(requestId);
PendingGitRequest<?> request = pendingRequestMap.remove(requestId);
if (request == null) {
log.debug("[{}] received stale response: {}", requestId, vcResponseMsg);
return;
} else {
log.debug("[{}] processing response: {}", requestId, vcResponseMsg);
request.getTimeoutTask().cancel(true);
}
var future = request.getFuture();
if (!StringUtils.isEmpty(vcResponseMsg.getError())) {
@ -399,6 +417,14 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
}
}
private void processTimeout(UUID requestId) {
PendingGitRequest<?> pendingRequest = pendingRequestMap.remove(requestId);
if (pendingRequest != null) {
log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout);
pendingRequest.getFuture().setException(new TimeoutException("Request timed out"));
}
}
private PageData<EntityVersion> toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) {
var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList());
return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext());

4
application/src/main/java/org/thingsboard/server/service/sync/vc/data/PendingGitRequest.java

@ -17,9 +17,11 @@ package org.thingsboard.server.service.sync.vc.data;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
@Getter
public class PendingGitRequest<T> {
@ -28,6 +30,8 @@ public class PendingGitRequest<T> {
private final UUID requestId;
private final TenantId tenantId;
private final SettableFuture<T> future;
@Setter
private ScheduledFuture<?> timeoutTask;
public PendingGitRequest(TenantId tenantId) {
this.createdTime = System.currentTimeMillis();

1
application/src/main/resources/thingsboard.yml

@ -1039,6 +1039,7 @@ queue:
partitions: "${TB_QUEUE_VC_PARTITIONS:10}"
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}"
js:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"

Loading…
Cancel
Save