diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index a9692f4b9d..925b60de76 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -35,6 +35,7 @@ import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.JobManager; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MqttClientSettings; +import org.thingsboard.rule.engine.api.TbHttpClientSettings; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAiChatModelService; import org.thingsboard.rule.engine.api.SmsService; @@ -684,6 +685,10 @@ public class ActorSystemContext { @Getter private MqttClientSettings mqttClientSettings; + @Autowired(required = false) + @Getter + private TbHttpClientSettings tbHttpClientSettings; + @Getter @Setter private TbActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 66a15a651e..3a572503d6 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -27,6 +27,7 @@ import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.JobManager; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MqttClientSettings; +import org.thingsboard.rule.engine.api.TbHttpClientSettings; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAiChatModelService; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; @@ -1062,6 +1063,11 @@ public class DefaultTbContext implements TbContext { return mainCtx.getMqttClientSettings(); } + @Override + public TbHttpClientSettings getTbHttpClientSettings() { + return mainCtx.getTbHttpClientSettings(); + } + private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("ruleNodeId", ruleNodeId.toString()); diff --git a/application/src/main/java/org/thingsboard/server/config/TbHttpClientSettingsComponent.java b/application/src/main/java/org/thingsboard/server/config/TbHttpClientSettingsComponent.java new file mode 100644 index 0000000000..a5ad4e8742 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/config/TbHttpClientSettingsComponent.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.api.TbHttpClientSettings; +import org.thingsboard.server.queue.util.TbRuleEngineComponent; + +@TbRuleEngineComponent +@Component +public class TbHttpClientSettingsComponent implements TbHttpClientSettings { + + @Value("${actors.rule.external.http_client.max_parallel_requests:0}") + private int maxParallelRequests; + + @Value("${actors.rule.external.http_client.max_pending_requests:0}") + private int maxPendingRequests; + + @Value("${actors.rule.external.http_client.pool_max_connections:0}") + private int poolMaxConnections; + + @Override + public int getMaxParallelRequests() { + return maxParallelRequests; + } + + @Override + public int getMaxPendingRequests() { + return maxPendingRequests; + } + + @Override + public int getPoolMaxConnections() { + return poolMaxConnections; + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 81b56aff28..bac005d836 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -567,6 +567,18 @@ actors: # Use this when your rule chains need to reach devices on private networks (e.g., 192.168.1.0/24). # Example: "192.168.1.0/24,10.0.0.0/8,my-internal-service.corp" ssrf_allowed_hosts: "${SSRF_ALLOWED_HOSTS:}" + http_client: + # Server-level ceiling for parallel in-flight HTTP requests per external HTTP rule node instance. + # Applied as min(nodeConfig, systemMax) when set; 0 = no system-level restriction (node config wins). + max_parallel_requests: "${ACTORS_RULE_EXTERNAL_HTTP_CLIENT_MAX_PARALLEL_REQUESTS:0}" + # Server-level ceiling for the pending-request queue depth per external HTTP rule node instance. + # Applied as min(nodeConfig, systemMax) when set; 0 = no system-level restriction. + max_pending_requests: "${ACTORS_RULE_EXTERNAL_HTTP_CLIENT_MAX_PENDING_REQUESTS:0}" + # Maximum number of TCP connections in the reactor-netty connection pool per external HTTP rule node instance. + # Defaults to reactor-netty's ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS: max(availableProcessors, 8) * 2 + # (e.g. 16 on an 8-core host). Increase for high-throughput nodes calling remote services that support many connections. + # 0 = use reactor-netty default. + pool_max_connections: "${ACTORS_RULE_EXTERNAL_HTTP_CLIENT_POOL_MAX_CONNECTIONS:${TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS:0}}" rpc: # Maximum number of persistent RPC call retries in case of failed request delivery. max_retries: "${ACTORS_RPC_MAX_RETRIES:5}" diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index dc27f79ee2..464c2df532 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -438,4 +438,10 @@ public interface TbContext { MqttClientSettings getMqttClientSettings(); + // Server-level safety caps for the HTTP client used by the REST API Call rule node (read from thingsboard.yml) + + default TbHttpClientSettings getTbHttpClientSettings() { + return TbHttpClientSettings.DEFAULT; + } + } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbHttpClientSettings.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbHttpClientSettings.java new file mode 100644 index 0000000000..658f82c0d7 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbHttpClientSettings.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +/** + * Server-level safety caps for the HTTP client used by the REST API Call rule node. + * Values are read from {@code thingsboard.yml} (or the corresponding environment variables) + * and applied as hard ceilings on top of the per-node tenant configuration. + * A value of {@code 0} means no system-level restriction. + */ +public interface TbHttpClientSettings { + + /** System ceiling for {@code maxParallelRequestsCount}. 0 = no system limit. */ + int getMaxParallelRequests(); + + /** System ceiling for the pending-request queue depth. 0 = no system limit. */ + int getMaxPendingRequests(); + + /** + * Maximum number of TCP connections in the reactor-netty pool per node instance. + * 0 = use reactor-netty's default: {@code max(availableProcessors, 8) * 2}. + */ + int getPoolMaxConnections(); + + TbHttpClientSettings DEFAULT = new TbHttpClientSettings() { + @Override + public int getMaxParallelRequests() { return 0; } + + @Override + public int getMaxPendingRequests() { return 0; } + + @Override + public int getPoolMaxConnections() { return 0; } + }; + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index df9ce0194b..de1e73cb66 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -33,6 +33,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import org.springframework.web.util.UriComponentsBuilder; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.SsrfProtectionValidator; +import org.thingsboard.rule.engine.api.TbHttpClientSettings; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; @@ -42,6 +43,7 @@ import org.thingsboard.rule.engine.credentials.CredentialsType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.transport.ProxyProvider; @@ -53,8 +55,11 @@ import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -84,17 +89,48 @@ public class TbHttpClient { public static final String MAX_IN_MEMORY_BUFFER_SIZE_IN_KB = "tb.http.maxInMemoryBufferSizeInKb"; + private static final long ANOMALY_REPORT_INTERVAL_MS = 60_000; + private final TbRestApiCallNodeConfiguration config; + private final String tenantId; + private final String nodeId; + private final TbHttpClientSettings settings; private EventLoopGroup eventLoopGroup; private WebClient webClient; private Semaphore semaphore; + private BlockingQueue pendingQueue; + + private final AtomicLong dispatchedCount = new AtomicLong(); + private final AtomicLong successCount = new AtomicLong(); + private final AtomicLong failureCount = new AtomicLong(); + private final AtomicLong droppedFullCount = new AtomicLong(); + private final AtomicLong droppedStaleCount = new AtomicLong(); + private volatile long lastAnomalyReportAt = 0; + + private record PendingTask( + TbContext ctx, + TbMsg msg, + Consumer onSuccess, + BiConsumer onFailure, + long enqueuedNanos) {} TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared) throws TbNodeException { + this(config, eventLoopGroupShared, "n/a", "n/a", TbHttpClientSettings.DEFAULT); + } + + TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared, + String tenantId, String nodeId, TbHttpClientSettings settings) throws TbNodeException { try { this.config = config; - if (config.getMaxParallelRequestsCount() > 0) { - semaphore = new Semaphore(config.getMaxParallelRequestsCount()); + this.tenantId = tenantId; + this.nodeId = nodeId; + this.settings = settings; + int effectiveParallel = effectiveMax(config.getMaxParallelRequestsCount(), settings.getMaxParallelRequests()); + if (effectiveParallel > 0) { + semaphore = new Semaphore(effectiveParallel); + int effectivePending = effectiveMax(0, settings.getMaxPendingRequests()); + pendingQueue = effectivePending > 0 ? new LinkedBlockingQueue<>(effectivePending) : new LinkedBlockingQueue<>(); } ConnectionProvider connectionProvider = ConnectionProvider @@ -155,16 +191,20 @@ public class TbHttpClient { } } - private int getPoolMaxConnections() { - String poolMaxConnectionsEnv = System.getenv("TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS"); + /** + * Returns the effective limit: {@code min(userMax, systemMax)} when both are positive, + * {@code systemMax} when only the system ceiling is set, or {@code userMax} otherwise. + * A value of {@code 0} means unlimited. + */ + private static int effectiveMax(int userMax, int systemMax) { + if (systemMax <= 0) return userMax; + if (userMax <= 0) return systemMax; + return Math.min(userMax, systemMax); + } - int poolMaxConnections; - if (poolMaxConnectionsEnv != null) { - poolMaxConnections = Integer.parseInt(poolMaxConnectionsEnv); - } else { - poolMaxConnections = ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS; - } - return poolMaxConnections; + private int getPoolMaxConnections() { + int configured = settings.getPoolMaxConnections(); + return configured > 0 ? configured : ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS; } private void validateMaxInMemoryBufferSize(TbRestApiCallNodeConfiguration config) throws TbNodeException { @@ -207,54 +247,150 @@ public class TbHttpClient { if (this.eventLoopGroup != null) { this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); } + long full = droppedFullCount.get(); + long stale = droppedStaleCount.get(); + int availablePermits = semaphore != null ? semaphore.availablePermits() : -1; + if (full > 0 || stale > 0) { + log.warn("[{}][{}] REST API call node destroyed with anomalies: " + + "droppedQueueFull={}, droppedStale={}, dispatched={}, success={}, failure={}, semaphorePermits={}.", + tenantId, nodeId, full, stale, + dispatchedCount.get(), successCount.get(), failureCount.get(), availablePermits); + } else { + log.debug("[{}][{}] REST API call node destroyed. dispatched={}, success={}, failure={}, semaphorePermits={}.", + tenantId, nodeId, dispatchedCount.get(), successCount.get(), failureCount.get(), availablePermits); + } } public void processMessage(TbContext ctx, TbMsg msg, Consumer onSuccess, BiConsumer onFailure) { - try { - if (semaphore != null && !semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS)) { - onFailure.accept(msg, new RuntimeException("Timeout during waiting for reply!")); - return; + if (semaphore == null) { + doHttpCall(new PendingTask(ctx, msg, onSuccess, onFailure, 0L)); + return; + } + if (!pendingQueue.offer(new PendingTask(ctx, msg, onSuccess, onFailure, System.nanoTime()))) { + droppedFullCount.incrementAndGet(); + log.debug("[{}][{}] REST API call queue full, dropping msg {}.", tenantId, nodeId, msg.getId()); + maybeReportAnomalies(); + onFailure.accept(msg, new RuntimeException("Max pending requests limit exceeded!")); + return; + } + tryProcess(); + } + + /** + * Tries to acquire one concurrency slot and fire the next queued task. + * Stale messages (whose message pack has expired) are silently dropped. + * Safe to call from any thread under high concurrency. + */ + private void tryProcess() { + while (true) { + if (!semaphore.tryAcquire()) { + return; // all slots are in use; a callback will call tryProcess() when one frees up + } + PendingTask next = pendingQueue.poll(); + if (next == null) { + semaphore.release(); + return; // queue is empty; slot released } + if (!next.msg().isValid()) { + semaphore.release(); + droppedStaleCount.incrementAndGet(); + log.debug("[{}][{}] Dropping stale msg {} from REST API call queue (queueDepth={}).", + tenantId, nodeId, next.msg().getId(), pendingQueue.size()); + next.onFailure().accept(next.msg(), new RuntimeException("Message is no longer valid. Dropped from queue.")); + maybeReportAnomalies(); + continue; // slot released — loop to check if there's a valid next item + } + dispatchedCount.incrementAndGet(); + if (doHttpCall(next)) { + return; // async HTTP call started — its callback will call tryProcess() + } + // synchronous failure — semaphore already released in doHttpCall, loop to try next task + } + } - String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg); + private void maybeReportAnomalies() { + long now = System.currentTimeMillis(); + if (now - lastAnomalyReportAt < ANOMALY_REPORT_INTERVAL_MS) { + return; + } + lastAnomalyReportAt = now; + int queueSize = pendingQueue != null ? pendingQueue.size() : 0; + int queueRemaining = pendingQueue != null ? pendingQueue.remainingCapacity() : Integer.MAX_VALUE; + int availablePermits = semaphore != null ? semaphore.availablePermits() : -1; + log.warn("[{}][{}] REST API call node anomalies: droppedQueueFull={}, droppedStale={} " + + "(dispatched={}, success={}, failure={}, queueDepth={}, queueRemaining={}, semaphorePermits={}).", + tenantId, nodeId, + droppedFullCount.get(), droppedStaleCount.get(), + dispatchedCount.get(), successCount.get(), failureCount.get(), + queueSize, queueRemaining, availablePermits); + } + + /** + * Initiates an async HTTP call for the given task. + * + * @return {@code true} if the async subscription was started and the semaphore slot is now + * owned by the callback (which will release it and call {@link #tryProcess()}). + * {@code false} if a synchronous exception occurred before the subscription was + * registered; the semaphore slot has already been released and the caller should + * loop rather than recurse to avoid stack overflow when many queued tasks fail + * synchronously (e.g. misconfigured URL pattern). + */ + private boolean doHttpCall(PendingTask task) { + boolean asyncStarted = false; + try { + String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), task.msg()); HttpMethod method = HttpMethod.valueOf(config.getRequestMethod()); URI uri = buildEncodedUri(endpointUrl); - RequestBodySpec request = webClient + RequestBodySpec req = webClient .method(method) .uri(uri) - .headers(headers -> prepareHeaders(headers, msg)); + .headers(headers -> prepareHeaders(headers, task.msg())); if ((HttpMethod.POST.equals(method) || HttpMethod.PUT.equals(method) || HttpMethod.PATCH.equals(method) || HttpMethod.DELETE.equals(method)) && !config.isIgnoreRequestBody()) { - request.body(BodyInserters.fromValue(getData(msg, config.isParseToPlainText()))); + req.body(BodyInserters.fromValue(getData(task.msg(), config.isParseToPlainText()))); } - request - .retrieve() + req.retrieve() .toEntity(String.class) - .subscribe(responseEntity -> { + .publishOn(Schedulers.fromExecutor(task.ctx().getExternalCallExecutor())) + .doFinally(signalType -> { + // Runs exactly once after onComplete, onError, or cancel — the only + // place that releases the permit for the async path. if (semaphore != null) { semaphore.release(); + tryProcess(); } - + }) + .subscribe(responseEntity -> { if (responseEntity.getStatusCode().is2xxSuccessful()) { - onSuccess.accept(processResponse(ctx, msg, responseEntity)); + successCount.incrementAndGet(); + task.onSuccess().accept(processResponse(task.ctx(), task.msg(), responseEntity)); } else { - onFailure.accept(processFailureResponse(msg, responseEntity), null); + failureCount.incrementAndGet(); + task.onFailure().accept(processFailureResponse(task.msg(), responseEntity), null); } }, throwable -> { - if (semaphore != null) { - semaphore.release(); - } - - onFailure.accept(processException(msg, throwable), processThrowable(throwable)); + failureCount.incrementAndGet(); + task.onFailure().accept(processException(task.msg(), throwable), processThrowable(throwable)); }); - } catch (InterruptedException e) { - log.warn("Timeout during waiting for reply!", e); + asyncStarted = true; + return true; + } catch (Exception e) { + failureCount.incrementAndGet(); + task.onFailure().accept(processException(task.msg(), e), processThrowable(e)); + return false; + } finally { + // Synchronous permit release: only when the async pipeline was never registered + // (asyncStarted=false). If it was, doFinally owns the release. + // tryProcess() is intentionally not called here — the caller loops iteratively. + if (!asyncStarted && semaphore != null) { + semaphore.release(); + } } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java index 0217edf5e0..02ed33138d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.rest; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.thingsboard.rule.engine.api.TbHttpClientSettings; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -58,7 +59,11 @@ public class TbRestApiCallNode extends TbAbstractExternalNode { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { super.init(ctx); TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class); - httpClient = new TbHttpClient(config, ctx.getSharedEventLoop()); + TbHttpClientSettings httpClientSettings = ctx.getTbHttpClientSettings(); + httpClient = new TbHttpClient(config, ctx.getSharedEventLoop(), + ctx.getTenantId() != null ? ctx.getTenantId().getId().toString() : "n/a", + ctx.getSelfId() != null ? ctx.getSelfId().getId().toString() : "n/a", + httpClientSettings != null ? httpClientSettings : TbHttpClientSettings.DEFAULT); } @Override diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java index a33d25b038..8b31c5a5ac 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java @@ -27,6 +27,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockserver.integration.ClientAndServer; import org.springframework.util.LinkedMultiValueMap; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; @@ -156,6 +157,7 @@ public class TbHttpClientTest { .build(); var ctx = mock(TbContext.class); + when(ctx.getExternalCallExecutor()).thenReturn(DirectListeningExecutor.INSTANCE); when(ctx.transformMsg( eq(msg), eq(msg.getMetaData()), diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbRestApiCallNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbRestApiCallNodeTest.java index a93417275f..493c5abf8c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbRestApiCallNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbRestApiCallNodeTest.java @@ -27,13 +27,17 @@ import org.apache.http.protocol.HttpRequestHandler; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.DirectListeningExecutor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.api.TbHttpClientSettings; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -43,6 +47,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -53,15 +58,22 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest { + static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30); + @Spy private TbRestApiCallNode restNode; @@ -125,6 +137,8 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest { } }); + given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE); + TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration(); config.setRequestMethod("DELETE"); config.setHeaders(Collections.singletonMap("Foo", "Bar")); @@ -148,7 +162,7 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest { ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); - verify(ctx, timeout(10_000)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); + verify(ctx, timeout(TIMEOUT)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); assertNotSame(metaData, metadataCaptor.getValue()); assertEquals(TbMsg.EMPTY_JSON_OBJECT, dataCaptor.getValue()); @@ -183,6 +197,8 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest { } }); + given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE); + TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration(); config.setRequestMethod("DELETE"); config.setHeaders(Collections.singletonMap("Foo", "Bar")); @@ -206,12 +222,143 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest { ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); - verify(ctx, timeout(10_000)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); + verify(ctx, timeout(TIMEOUT)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); assertNotSame(metaData, metadataCaptor.getValue()); assertEquals(TbMsg.EMPTY_JSON_OBJECT, dataCaptor.getValue()); } + @Test + public void givenForceAckTrue_whenOnMsgAndServerReturns200_thenAckedImmediatelyAndEnqueuedForTellNext() throws IOException { + final String path = "/path/to/get"; + setupServer("*", new HttpRequestHandler() { + @Override + public void handle(HttpRequest request, HttpResponse response, HttpContext context) + throws HttpException, IOException { + response.setStatusCode(200); + } + }); + + TbMsg transformedMsg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(originator) + .copyMetaData(metaData) + .dataType(TbMsgDataType.JSON) + .data(TbMsg.EMPTY_JSON_OBJECT) + .ruleChainId(ruleChainId) + .ruleNodeId(ruleNodeId) + .build(); + + given(ctx.isExternalNodeForceAck()).willReturn(true); + given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE); + given(ctx.transformMsg(any(), any(), any())).willReturn(transformedMsg); + + TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration(); + config.setRequestMethod("GET"); + config.setIgnoreRequestBody(true); + config.setRestEndpointUrlPattern(String.format("http://localhost:%d%s", server.getLocalPort(), path)); + initWithConfig(config); + + TbMsg msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(originator) + .copyMetaData(metaData) + .dataType(TbMsgDataType.JSON) + .data(TbMsg.EMPTY_JSON_OBJECT) + .ruleChainId(ruleChainId) + .ruleNodeId(ruleNodeId) + .build(); + restNode.onMsg(ctx, msg); + + verify(ctx).ack(msg); + verify(ctx, timeout(TIMEOUT)).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS)); + verify(ctx, never()).tellSuccess(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void givenMaxParallelRequestsCountAndBadUrl_whenOnMsg_thenSemaphoreIsReleasedAndFailureReported(boolean forceAck) throws IOException { + given(ctx.isExternalNodeForceAck()).willReturn(forceAck); + + TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration(); + config.setMaxParallelRequestsCount(1); + config.setRestEndpointUrlPattern(""); + initWithConfig(config); + + TbMsg msg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(originator) + .copyMetaData(metaData) + .dataType(TbMsgDataType.JSON) + .data(TbMsg.EMPTY_JSON_OBJECT) + .ruleChainId(ruleChainId) + .ruleNodeId(ruleNodeId) + .build(); + restNode.onMsg(ctx, msg); + + assertThat(restNode.httpClient.getSemaphore().availablePermits()).isEqualTo(1); + if (forceAck) { + verify(ctx).enqueueForTellFailure(any(), any(Throwable.class)); + } else { + verify(ctx).tellFailure(any(), any()); + } + } + + @Test + public void givenMaxPendingRequestsExceeded_whenOnMsg_thenFailsImmediatelyAndQueuedRequestFiresAfterSlotOpens() throws IOException, InterruptedException { + CountDownLatch releaseResponse = new CountDownLatch(1); + setupServer("*", (request, response, context) -> { + try { + releaseResponse.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + response.setStatusCode(200); + }); + + given(ctx.isExternalNodeForceAck()).willReturn(false); + given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE); + // Simulate server-level cap: maxPendingRequests=1 via TbHttpClientSettings + given(ctx.getTbHttpClientSettings()).willReturn(new TbHttpClientSettings() { + @Override public int getMaxParallelRequests() { return 0; } + @Override public int getMaxPendingRequests() { return 1; } + @Override public int getPoolMaxConnections() { return 0; } + }); + TbMsg transformedMsg = TbMsg.newMsg() + .type(TbMsgType.POST_TELEMETRY_REQUEST) + .originator(originator) + .copyMetaData(metaData) + .data(TbMsg.EMPTY_JSON_OBJECT) + .build(); + given(ctx.transformMsg(any(), any(), any())).willReturn(transformedMsg); + + TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration(); + config.setMaxParallelRequestsCount(1); + config.setRequestMethod("GET"); + config.setIgnoreRequestBody(true); + config.setRestEndpointUrlPattern(String.format("http://localhost:%d/path", server.getLocalPort())); + initWithConfig(config); + + TbMsg msg1 = TbMsg.newMsg().type(TbMsgType.POST_TELEMETRY_REQUEST).originator(originator) + .copyMetaData(metaData).dataType(TbMsgDataType.JSON).data(TbMsg.EMPTY_JSON_OBJECT) + .ruleChainId(ruleChainId).ruleNodeId(ruleNodeId).build(); + TbMsg msg2 = TbMsg.newMsg().type(TbMsgType.POST_TELEMETRY_REQUEST).originator(originator) + .copyMetaData(metaData).dataType(TbMsgDataType.JSON).data(TbMsg.EMPTY_JSON_OBJECT) + .ruleChainId(ruleChainId).ruleNodeId(ruleNodeId).build(); + TbMsg msg3 = TbMsg.newMsg().type(TbMsgType.POST_TELEMETRY_REQUEST).originator(originator) + .copyMetaData(metaData).dataType(TbMsgDataType.JSON).data(TbMsg.EMPTY_JSON_OBJECT) + .ruleChainId(ruleChainId).ruleNodeId(ruleNodeId).build(); + + restNode.onMsg(ctx, msg1); // fires immediately (semaphore acquired) + restNode.onMsg(ctx, msg2); // queues (semaphore exhausted, queue has room) + restNode.onMsg(ctx, msg3); // fails immediately (queue full — server-level maxPendingRequests=1) + + verify(ctx, timeout(TIMEOUT)).tellFailure(any(), any()); + + releaseResponse.countDown(); + verify(ctx, timeout(TIMEOUT).times(2)).tellSuccess(any()); + } + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of( // config for version 2 with upgrade from version 0