|
|
|
@ -33,6 +33,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti |
|
|
|
import org.springframework.web.util.UriComponentsBuilder; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.common.util.SsrfProtectionValidator; |
|
|
|
import org.thingsboard.rule.engine.api.TbHttpClientSettings; |
|
|
|
import org.thingsboard.rule.engine.api.TbContext; |
|
|
|
import org.thingsboard.rule.engine.api.TbNodeException; |
|
|
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|
|
|
@ -42,6 +43,7 @@ import org.thingsboard.rule.engine.credentials.CredentialsType; |
|
|
|
import org.thingsboard.server.common.data.StringUtils; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
import reactor.core.scheduler.Schedulers; |
|
|
|
import reactor.netty.http.client.HttpClient; |
|
|
|
import reactor.netty.resources.ConnectionProvider; |
|
|
|
import reactor.netty.transport.ProxyProvider; |
|
|
|
@ -53,8 +55,11 @@ import java.util.Base64; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Properties; |
|
|
|
import java.util.concurrent.BlockingQueue; |
|
|
|
import java.util.concurrent.LinkedBlockingQueue; |
|
|
|
import java.util.concurrent.Semaphore; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.atomic.AtomicLong; |
|
|
|
import java.util.function.BiConsumer; |
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
@ -84,17 +89,48 @@ public class TbHttpClient { |
|
|
|
|
|
|
|
public static final String MAX_IN_MEMORY_BUFFER_SIZE_IN_KB = "tb.http.maxInMemoryBufferSizeInKb"; |
|
|
|
|
|
|
|
private static final long ANOMALY_REPORT_INTERVAL_MS = 60_000; |
|
|
|
|
|
|
|
private final TbRestApiCallNodeConfiguration config; |
|
|
|
private final String tenantId; |
|
|
|
private final String nodeId; |
|
|
|
private final TbHttpClientSettings settings; |
|
|
|
|
|
|
|
private EventLoopGroup eventLoopGroup; |
|
|
|
private WebClient webClient; |
|
|
|
private Semaphore semaphore; |
|
|
|
private BlockingQueue<PendingTask> pendingQueue; |
|
|
|
|
|
|
|
private final AtomicLong dispatchedCount = new AtomicLong(); |
|
|
|
private final AtomicLong successCount = new AtomicLong(); |
|
|
|
private final AtomicLong failureCount = new AtomicLong(); |
|
|
|
private final AtomicLong droppedFullCount = new AtomicLong(); |
|
|
|
private final AtomicLong droppedStaleCount = new AtomicLong(); |
|
|
|
private volatile long lastAnomalyReportAt = 0; |
|
|
|
|
|
|
|
private record PendingTask( |
|
|
|
TbContext ctx, |
|
|
|
TbMsg msg, |
|
|
|
Consumer<TbMsg> onSuccess, |
|
|
|
BiConsumer<TbMsg, Throwable> onFailure, |
|
|
|
long enqueuedNanos) {} |
|
|
|
|
|
|
|
TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared) throws TbNodeException { |
|
|
|
this(config, eventLoopGroupShared, "n/a", "n/a", TbHttpClientSettings.DEFAULT); |
|
|
|
} |
|
|
|
|
|
|
|
TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared, |
|
|
|
String tenantId, String nodeId, TbHttpClientSettings settings) throws TbNodeException { |
|
|
|
try { |
|
|
|
this.config = config; |
|
|
|
if (config.getMaxParallelRequestsCount() > 0) { |
|
|
|
semaphore = new Semaphore(config.getMaxParallelRequestsCount()); |
|
|
|
this.tenantId = tenantId; |
|
|
|
this.nodeId = nodeId; |
|
|
|
this.settings = settings; |
|
|
|
int effectiveParallel = effectiveMax(config.getMaxParallelRequestsCount(), settings.getMaxParallelRequests()); |
|
|
|
if (effectiveParallel > 0) { |
|
|
|
semaphore = new Semaphore(effectiveParallel); |
|
|
|
int effectivePending = effectiveMax(0, settings.getMaxPendingRequests()); |
|
|
|
pendingQueue = effectivePending > 0 ? new LinkedBlockingQueue<>(effectivePending) : new LinkedBlockingQueue<>(); |
|
|
|
} |
|
|
|
|
|
|
|
ConnectionProvider connectionProvider = ConnectionProvider |
|
|
|
@ -155,16 +191,20 @@ public class TbHttpClient { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private int getPoolMaxConnections() { |
|
|
|
String poolMaxConnectionsEnv = System.getenv("TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS"); |
|
|
|
/** |
|
|
|
* Returns the effective limit: {@code min(userMax, systemMax)} when both are positive, |
|
|
|
* {@code systemMax} when only the system ceiling is set, or {@code userMax} otherwise. |
|
|
|
* A value of {@code 0} means unlimited. |
|
|
|
*/ |
|
|
|
private static int effectiveMax(int userMax, int systemMax) { |
|
|
|
if (systemMax <= 0) return userMax; |
|
|
|
if (userMax <= 0) return systemMax; |
|
|
|
return Math.min(userMax, systemMax); |
|
|
|
} |
|
|
|
|
|
|
|
int poolMaxConnections; |
|
|
|
if (poolMaxConnectionsEnv != null) { |
|
|
|
poolMaxConnections = Integer.parseInt(poolMaxConnectionsEnv); |
|
|
|
} else { |
|
|
|
poolMaxConnections = ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS; |
|
|
|
} |
|
|
|
return poolMaxConnections; |
|
|
|
private int getPoolMaxConnections() { |
|
|
|
int configured = settings.getPoolMaxConnections(); |
|
|
|
return configured > 0 ? configured : ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS; |
|
|
|
} |
|
|
|
|
|
|
|
private void validateMaxInMemoryBufferSize(TbRestApiCallNodeConfiguration config) throws TbNodeException { |
|
|
|
@ -207,54 +247,150 @@ public class TbHttpClient { |
|
|
|
if (this.eventLoopGroup != null) { |
|
|
|
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); |
|
|
|
} |
|
|
|
long full = droppedFullCount.get(); |
|
|
|
long stale = droppedStaleCount.get(); |
|
|
|
int availablePermits = semaphore != null ? semaphore.availablePermits() : -1; |
|
|
|
if (full > 0 || stale > 0) { |
|
|
|
log.warn("[{}][{}] REST API call node destroyed with anomalies: " + |
|
|
|
"droppedQueueFull={}, droppedStale={}, dispatched={}, success={}, failure={}, semaphorePermits={}.", |
|
|
|
tenantId, nodeId, full, stale, |
|
|
|
dispatchedCount.get(), successCount.get(), failureCount.get(), availablePermits); |
|
|
|
} else { |
|
|
|
log.debug("[{}][{}] REST API call node destroyed. dispatched={}, success={}, failure={}, semaphorePermits={}.", |
|
|
|
tenantId, nodeId, dispatchedCount.get(), successCount.get(), failureCount.get(), availablePermits); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void processMessage(TbContext ctx, TbMsg msg, |
|
|
|
Consumer<TbMsg> onSuccess, |
|
|
|
BiConsumer<TbMsg, Throwable> onFailure) { |
|
|
|
try { |
|
|
|
if (semaphore != null && !semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS)) { |
|
|
|
onFailure.accept(msg, new RuntimeException("Timeout during waiting for reply!")); |
|
|
|
return; |
|
|
|
if (semaphore == null) { |
|
|
|
doHttpCall(new PendingTask(ctx, msg, onSuccess, onFailure, 0L)); |
|
|
|
return; |
|
|
|
} |
|
|
|
if (!pendingQueue.offer(new PendingTask(ctx, msg, onSuccess, onFailure, System.nanoTime()))) { |
|
|
|
droppedFullCount.incrementAndGet(); |
|
|
|
log.debug("[{}][{}] REST API call queue full, dropping msg {}.", tenantId, nodeId, msg.getId()); |
|
|
|
maybeReportAnomalies(); |
|
|
|
onFailure.accept(msg, new RuntimeException("Max pending requests limit exceeded!")); |
|
|
|
return; |
|
|
|
} |
|
|
|
tryProcess(); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Tries to acquire one concurrency slot and fire the next queued task. |
|
|
|
* Stale messages (whose message pack has expired) are silently dropped. |
|
|
|
* Safe to call from any thread under high concurrency. |
|
|
|
*/ |
|
|
|
private void tryProcess() { |
|
|
|
while (true) { |
|
|
|
if (!semaphore.tryAcquire()) { |
|
|
|
return; // all slots are in use; a callback will call tryProcess() when one frees up
|
|
|
|
} |
|
|
|
PendingTask next = pendingQueue.poll(); |
|
|
|
if (next == null) { |
|
|
|
semaphore.release(); |
|
|
|
return; // queue is empty; slot released
|
|
|
|
} |
|
|
|
if (!next.msg().isValid()) { |
|
|
|
semaphore.release(); |
|
|
|
droppedStaleCount.incrementAndGet(); |
|
|
|
log.debug("[{}][{}] Dropping stale msg {} from REST API call queue (queueDepth={}).", |
|
|
|
tenantId, nodeId, next.msg().getId(), pendingQueue.size()); |
|
|
|
next.onFailure().accept(next.msg(), new RuntimeException("Message is no longer valid. Dropped from queue.")); |
|
|
|
maybeReportAnomalies(); |
|
|
|
continue; // slot released — loop to check if there's a valid next item
|
|
|
|
} |
|
|
|
dispatchedCount.incrementAndGet(); |
|
|
|
if (doHttpCall(next)) { |
|
|
|
return; // async HTTP call started — its callback will call tryProcess()
|
|
|
|
} |
|
|
|
// synchronous failure — semaphore already released in doHttpCall, loop to try next task
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg); |
|
|
|
private void maybeReportAnomalies() { |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
if (now - lastAnomalyReportAt < ANOMALY_REPORT_INTERVAL_MS) { |
|
|
|
return; |
|
|
|
} |
|
|
|
lastAnomalyReportAt = now; |
|
|
|
int queueSize = pendingQueue != null ? pendingQueue.size() : 0; |
|
|
|
int queueRemaining = pendingQueue != null ? pendingQueue.remainingCapacity() : Integer.MAX_VALUE; |
|
|
|
int availablePermits = semaphore != null ? semaphore.availablePermits() : -1; |
|
|
|
log.warn("[{}][{}] REST API call node anomalies: droppedQueueFull={}, droppedStale={} " + |
|
|
|
"(dispatched={}, success={}, failure={}, queueDepth={}, queueRemaining={}, semaphorePermits={}).", |
|
|
|
tenantId, nodeId, |
|
|
|
droppedFullCount.get(), droppedStaleCount.get(), |
|
|
|
dispatchedCount.get(), successCount.get(), failureCount.get(), |
|
|
|
queueSize, queueRemaining, availablePermits); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Initiates an async HTTP call for the given task. |
|
|
|
* |
|
|
|
* @return {@code true} if the async subscription was started and the semaphore slot is now |
|
|
|
* owned by the callback (which will release it and call {@link #tryProcess()}). |
|
|
|
* {@code false} if a synchronous exception occurred before the subscription was |
|
|
|
* registered; the semaphore slot has already been released and the caller should |
|
|
|
* loop rather than recurse to avoid stack overflow when many queued tasks fail |
|
|
|
* synchronously (e.g. misconfigured URL pattern). |
|
|
|
*/ |
|
|
|
private boolean doHttpCall(PendingTask task) { |
|
|
|
boolean asyncStarted = false; |
|
|
|
try { |
|
|
|
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), task.msg()); |
|
|
|
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod()); |
|
|
|
URI uri = buildEncodedUri(endpointUrl); |
|
|
|
|
|
|
|
RequestBodySpec request = webClient |
|
|
|
RequestBodySpec req = webClient |
|
|
|
.method(method) |
|
|
|
.uri(uri) |
|
|
|
.headers(headers -> prepareHeaders(headers, msg)); |
|
|
|
.headers(headers -> prepareHeaders(headers, task.msg())); |
|
|
|
|
|
|
|
if ((HttpMethod.POST.equals(method) || HttpMethod.PUT.equals(method) || |
|
|
|
HttpMethod.PATCH.equals(method) || HttpMethod.DELETE.equals(method)) && |
|
|
|
!config.isIgnoreRequestBody()) { |
|
|
|
request.body(BodyInserters.fromValue(getData(msg, config.isParseToPlainText()))); |
|
|
|
req.body(BodyInserters.fromValue(getData(task.msg(), config.isParseToPlainText()))); |
|
|
|
} |
|
|
|
|
|
|
|
request |
|
|
|
.retrieve() |
|
|
|
req.retrieve() |
|
|
|
.toEntity(String.class) |
|
|
|
.subscribe(responseEntity -> { |
|
|
|
.publishOn(Schedulers.fromExecutor(task.ctx().getExternalCallExecutor())) |
|
|
|
.doFinally(signalType -> { |
|
|
|
// Runs exactly once after onComplete, onError, or cancel — the only
|
|
|
|
// place that releases the permit for the async path.
|
|
|
|
if (semaphore != null) { |
|
|
|
semaphore.release(); |
|
|
|
tryProcess(); |
|
|
|
} |
|
|
|
|
|
|
|
}) |
|
|
|
.subscribe(responseEntity -> { |
|
|
|
if (responseEntity.getStatusCode().is2xxSuccessful()) { |
|
|
|
onSuccess.accept(processResponse(ctx, msg, responseEntity)); |
|
|
|
successCount.incrementAndGet(); |
|
|
|
task.onSuccess().accept(processResponse(task.ctx(), task.msg(), responseEntity)); |
|
|
|
} else { |
|
|
|
onFailure.accept(processFailureResponse(msg, responseEntity), null); |
|
|
|
failureCount.incrementAndGet(); |
|
|
|
task.onFailure().accept(processFailureResponse(task.msg(), responseEntity), null); |
|
|
|
} |
|
|
|
}, throwable -> { |
|
|
|
if (semaphore != null) { |
|
|
|
semaphore.release(); |
|
|
|
} |
|
|
|
|
|
|
|
onFailure.accept(processException(msg, throwable), processThrowable(throwable)); |
|
|
|
failureCount.incrementAndGet(); |
|
|
|
task.onFailure().accept(processException(task.msg(), throwable), processThrowable(throwable)); |
|
|
|
}); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
log.warn("Timeout during waiting for reply!", e); |
|
|
|
asyncStarted = true; |
|
|
|
return true; |
|
|
|
} catch (Exception e) { |
|
|
|
failureCount.incrementAndGet(); |
|
|
|
task.onFailure().accept(processException(task.msg(), e), processThrowable(e)); |
|
|
|
return false; |
|
|
|
} finally { |
|
|
|
// Synchronous permit release: only when the async pipeline was never registered
|
|
|
|
// (asyncStarted=false). If it was, doFinally owns the release.
|
|
|
|
// tryProcess() is intentionally not called here — the caller loops iteratively.
|
|
|
|
if (!asyncStarted && semaphore != null) { |
|
|
|
semaphore.release(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|