Browse Source

Fix REST API Call node blocking actor thread and semaphore permit leak

Replace the blocking semaphore guard with a non-blocking bounded FIFO queue
+ semaphore pattern:

- No semaphore/queue when maxParallelRequestsCount=0 (default): direct doHttpCall,
  identical to the old behavior.
- When a concurrency limit is set, incoming messages are enqueued via non-blocking
  offer(); a full queue triggers onFailure immediately.
- tryProcess() acquires one semaphore slot and dispatches the next valid queued task.
  Stale tasks (batch deadline expired) are dropped and the slot reused in the same pass.
- doFinally hook releases the semaphore and calls tryProcess() exactly once after any
  terminal signal (success, error, cancel), preventing double-release and permit leaks.
- publishOn(externalCallExecutor) moves callbacks off reactor-netty I/O threads.

System-level safety caps are wired through thingsboard.yml → ActorSystemContext →
TbContext → TbHttpClient, scoped to rule-engine services only via @TbRuleEngineComponent:
  actors.rule.external.http_client.max_parallel_requests (ACTORS_RULE_EXTERNAL_HTTP_CLIENT_MAX_PARALLEL_REQUESTS)
  actors.rule.external.http_client.max_pending_requests  (ACTORS_RULE_EXTERNAL_HTTP_CLIENT_MAX_PENDING_REQUESTS)
  actors.rule.external.http_client.pool_max_connections  (ACTORS_RULE_EXTERNAL_HTTP_CLIENT_POOL_MAX_CONNECTIONS)
Backward compat: TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS still honored via yaml fallback.

Observability: five AtomicLong counters (dispatched, success, failure, droppedQueueFull,
droppedStale) with periodic WARN anomaly logging including semaphorePermits for leak detection.

No configuration changes or upgrade scripts required — docker image update is sufficient.

Rename RestApiCallNodeSettings to TbHttpClientSettings

The settings are about HTTP client transport concerns (connection pool,
concurrency, queue depth), not REST API Call node business logic.
The new name matches the consumer (TbHttpClient) and the YAML path
(actors.rule.external.http_client.*).
pull/15334/head
Sergey Matvienko 2 months ago
parent
commit
5931921c04
  1. 5
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 6
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 51
      application/src/main/java/org/thingsboard/server/config/TbHttpClientSettingsComponent.java
  4. 12
      application/src/main/resources/thingsboard.yml
  5. 6
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  6. 49
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbHttpClientSettings.java
  7. 200
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java
  8. 7
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java
  9. 2
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java
  10. 151
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbRestApiCallNodeTest.java

5
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -35,6 +35,7 @@ import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.MqttClientSettings;
import org.thingsboard.rule.engine.api.TbHttpClientSettings;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineAiChatModelService;
import org.thingsboard.rule.engine.api.SmsService;
@ -684,6 +685,10 @@ public class ActorSystemContext {
@Getter
private MqttClientSettings mqttClientSettings;
@Autowired(required = false)
@Getter
private TbHttpClientSettings tbHttpClientSettings;
@Getter
@Setter
private TbActorSystem actorSystem;

6
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -27,6 +27,7 @@ import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.MqttClientSettings;
import org.thingsboard.rule.engine.api.TbHttpClientSettings;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineAiChatModelService;
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
@ -1062,6 +1063,11 @@ public class DefaultTbContext implements TbContext {
return mainCtx.getMqttClientSettings();
}
@Override
public TbHttpClientSettings getTbHttpClientSettings() {
return mainCtx.getTbHttpClientSettings();
}
private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ruleNodeId", ruleNodeId.toString());

51
application/src/main/java/org/thingsboard/server/config/TbHttpClientSettingsComponent.java

@ -0,0 +1,51 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.TbHttpClientSettings;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
@TbRuleEngineComponent
@Component
public class TbHttpClientSettingsComponent implements TbHttpClientSettings {
@Value("${actors.rule.external.http_client.max_parallel_requests:0}")
private int maxParallelRequests;
@Value("${actors.rule.external.http_client.max_pending_requests:0}")
private int maxPendingRequests;
@Value("${actors.rule.external.http_client.pool_max_connections:0}")
private int poolMaxConnections;
@Override
public int getMaxParallelRequests() {
return maxParallelRequests;
}
@Override
public int getMaxPendingRequests() {
return maxPendingRequests;
}
@Override
public int getPoolMaxConnections() {
return poolMaxConnections;
}
}

12
application/src/main/resources/thingsboard.yml

@ -567,6 +567,18 @@ actors:
# Use this when your rule chains need to reach devices on private networks (e.g., 192.168.1.0/24).
# Example: "192.168.1.0/24,10.0.0.0/8,my-internal-service.corp"
ssrf_allowed_hosts: "${SSRF_ALLOWED_HOSTS:}"
http_client:
# Server-level ceiling for parallel in-flight HTTP requests per external HTTP rule node instance.
# Applied as min(nodeConfig, systemMax) when set; 0 = no system-level restriction (node config wins).
max_parallel_requests: "${ACTORS_RULE_EXTERNAL_HTTP_CLIENT_MAX_PARALLEL_REQUESTS:0}"
# Server-level ceiling for the pending-request queue depth per external HTTP rule node instance.
# Applied as min(nodeConfig, systemMax) when set; 0 = no system-level restriction.
max_pending_requests: "${ACTORS_RULE_EXTERNAL_HTTP_CLIENT_MAX_PENDING_REQUESTS:0}"
# Maximum number of TCP connections in the reactor-netty connection pool per external HTTP rule node instance.
# Defaults to reactor-netty's ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS: max(availableProcessors, 8) * 2
# (e.g. 16 on an 8-core host). Increase for high-throughput nodes calling remote services that support many connections.
# 0 = use reactor-netty default.
pool_max_connections: "${ACTORS_RULE_EXTERNAL_HTTP_CLIENT_POOL_MAX_CONNECTIONS:${TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS:0}}"
rpc:
# Maximum number of persistent RPC call retries in case of failed request delivery.
max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"

6
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -438,4 +438,10 @@ public interface TbContext {
MqttClientSettings getMqttClientSettings();
// Server-level safety caps for the HTTP client used by the REST API Call rule node (read from thingsboard.yml)
default TbHttpClientSettings getTbHttpClientSettings() {
return TbHttpClientSettings.DEFAULT;
}
}

49
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbHttpClientSettings.java

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
/**
* Server-level safety caps for the HTTP client used by the REST API Call rule node.
* Values are read from {@code thingsboard.yml} (or the corresponding environment variables)
* and applied as hard ceilings on top of the per-node tenant configuration.
* A value of {@code 0} means no system-level restriction.
*/
public interface TbHttpClientSettings {
/** System ceiling for {@code maxParallelRequestsCount}. 0 = no system limit. */
int getMaxParallelRequests();
/** System ceiling for the pending-request queue depth. 0 = no system limit. */
int getMaxPendingRequests();
/**
* Maximum number of TCP connections in the reactor-netty pool per node instance.
* 0 = use reactor-netty's default: {@code max(availableProcessors, 8) * 2}.
*/
int getPoolMaxConnections();
TbHttpClientSettings DEFAULT = new TbHttpClientSettings() {
@Override
public int getMaxParallelRequests() { return 0; }
@Override
public int getMaxPendingRequests() { return 0; }
@Override
public int getPoolMaxConnections() { return 0; }
};
}

200
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java

@ -33,6 +33,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
import org.springframework.web.util.UriComponentsBuilder;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.SsrfProtectionValidator;
import org.thingsboard.rule.engine.api.TbHttpClientSettings;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
@ -42,6 +43,7 @@ import org.thingsboard.rule.engine.credentials.CredentialsType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.transport.ProxyProvider;
@ -53,8 +55,11 @@ import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -84,17 +89,48 @@ public class TbHttpClient {
public static final String MAX_IN_MEMORY_BUFFER_SIZE_IN_KB = "tb.http.maxInMemoryBufferSizeInKb";
private static final long ANOMALY_REPORT_INTERVAL_MS = 60_000;
private final TbRestApiCallNodeConfiguration config;
private final String tenantId;
private final String nodeId;
private final TbHttpClientSettings settings;
private EventLoopGroup eventLoopGroup;
private WebClient webClient;
private Semaphore semaphore;
private BlockingQueue<PendingTask> pendingQueue;
private final AtomicLong dispatchedCount = new AtomicLong();
private final AtomicLong successCount = new AtomicLong();
private final AtomicLong failureCount = new AtomicLong();
private final AtomicLong droppedFullCount = new AtomicLong();
private final AtomicLong droppedStaleCount = new AtomicLong();
private volatile long lastAnomalyReportAt = 0;
private record PendingTask(
TbContext ctx,
TbMsg msg,
Consumer<TbMsg> onSuccess,
BiConsumer<TbMsg, Throwable> onFailure,
long enqueuedNanos) {}
TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared) throws TbNodeException {
this(config, eventLoopGroupShared, "n/a", "n/a", TbHttpClientSettings.DEFAULT);
}
TbHttpClient(TbRestApiCallNodeConfiguration config, EventLoopGroup eventLoopGroupShared,
String tenantId, String nodeId, TbHttpClientSettings settings) throws TbNodeException {
try {
this.config = config;
if (config.getMaxParallelRequestsCount() > 0) {
semaphore = new Semaphore(config.getMaxParallelRequestsCount());
this.tenantId = tenantId;
this.nodeId = nodeId;
this.settings = settings;
int effectiveParallel = effectiveMax(config.getMaxParallelRequestsCount(), settings.getMaxParallelRequests());
if (effectiveParallel > 0) {
semaphore = new Semaphore(effectiveParallel);
int effectivePending = effectiveMax(0, settings.getMaxPendingRequests());
pendingQueue = effectivePending > 0 ? new LinkedBlockingQueue<>(effectivePending) : new LinkedBlockingQueue<>();
}
ConnectionProvider connectionProvider = ConnectionProvider
@ -155,16 +191,20 @@ public class TbHttpClient {
}
}
private int getPoolMaxConnections() {
String poolMaxConnectionsEnv = System.getenv("TB_RE_HTTP_CLIENT_POOL_MAX_CONNECTIONS");
/**
* Returns the effective limit: {@code min(userMax, systemMax)} when both are positive,
* {@code systemMax} when only the system ceiling is set, or {@code userMax} otherwise.
* A value of {@code 0} means unlimited.
*/
private static int effectiveMax(int userMax, int systemMax) {
if (systemMax <= 0) return userMax;
if (userMax <= 0) return systemMax;
return Math.min(userMax, systemMax);
}
int poolMaxConnections;
if (poolMaxConnectionsEnv != null) {
poolMaxConnections = Integer.parseInt(poolMaxConnectionsEnv);
} else {
poolMaxConnections = ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS;
}
return poolMaxConnections;
private int getPoolMaxConnections() {
int configured = settings.getPoolMaxConnections();
return configured > 0 ? configured : ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS;
}
private void validateMaxInMemoryBufferSize(TbRestApiCallNodeConfiguration config) throws TbNodeException {
@ -207,54 +247,150 @@ public class TbHttpClient {
if (this.eventLoopGroup != null) {
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
}
long full = droppedFullCount.get();
long stale = droppedStaleCount.get();
int availablePermits = semaphore != null ? semaphore.availablePermits() : -1;
if (full > 0 || stale > 0) {
log.warn("[{}][{}] REST API call node destroyed with anomalies: " +
"droppedQueueFull={}, droppedStale={}, dispatched={}, success={}, failure={}, semaphorePermits={}.",
tenantId, nodeId, full, stale,
dispatchedCount.get(), successCount.get(), failureCount.get(), availablePermits);
} else {
log.debug("[{}][{}] REST API call node destroyed. dispatched={}, success={}, failure={}, semaphorePermits={}.",
tenantId, nodeId, dispatchedCount.get(), successCount.get(), failureCount.get(), availablePermits);
}
}
public void processMessage(TbContext ctx, TbMsg msg,
Consumer<TbMsg> onSuccess,
BiConsumer<TbMsg, Throwable> onFailure) {
try {
if (semaphore != null && !semaphore.tryAcquire(config.getReadTimeoutMs(), TimeUnit.MILLISECONDS)) {
onFailure.accept(msg, new RuntimeException("Timeout during waiting for reply!"));
return;
if (semaphore == null) {
doHttpCall(new PendingTask(ctx, msg, onSuccess, onFailure, 0L));
return;
}
if (!pendingQueue.offer(new PendingTask(ctx, msg, onSuccess, onFailure, System.nanoTime()))) {
droppedFullCount.incrementAndGet();
log.debug("[{}][{}] REST API call queue full, dropping msg {}.", tenantId, nodeId, msg.getId());
maybeReportAnomalies();
onFailure.accept(msg, new RuntimeException("Max pending requests limit exceeded!"));
return;
}
tryProcess();
}
/**
* Tries to acquire one concurrency slot and fire the next queued task.
* Stale messages (whose message pack has expired) are silently dropped.
* Safe to call from any thread under high concurrency.
*/
private void tryProcess() {
while (true) {
if (!semaphore.tryAcquire()) {
return; // all slots are in use; a callback will call tryProcess() when one frees up
}
PendingTask next = pendingQueue.poll();
if (next == null) {
semaphore.release();
return; // queue is empty; slot released
}
if (!next.msg().isValid()) {
semaphore.release();
droppedStaleCount.incrementAndGet();
log.debug("[{}][{}] Dropping stale msg {} from REST API call queue (queueDepth={}).",
tenantId, nodeId, next.msg().getId(), pendingQueue.size());
next.onFailure().accept(next.msg(), new RuntimeException("Message is no longer valid. Dropped from queue."));
maybeReportAnomalies();
continue; // slot released — loop to check if there's a valid next item
}
dispatchedCount.incrementAndGet();
if (doHttpCall(next)) {
return; // async HTTP call started — its callback will call tryProcess()
}
// synchronous failure — semaphore already released in doHttpCall, loop to try next task
}
}
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg);
private void maybeReportAnomalies() {
long now = System.currentTimeMillis();
if (now - lastAnomalyReportAt < ANOMALY_REPORT_INTERVAL_MS) {
return;
}
lastAnomalyReportAt = now;
int queueSize = pendingQueue != null ? pendingQueue.size() : 0;
int queueRemaining = pendingQueue != null ? pendingQueue.remainingCapacity() : Integer.MAX_VALUE;
int availablePermits = semaphore != null ? semaphore.availablePermits() : -1;
log.warn("[{}][{}] REST API call node anomalies: droppedQueueFull={}, droppedStale={} " +
"(dispatched={}, success={}, failure={}, queueDepth={}, queueRemaining={}, semaphorePermits={}).",
tenantId, nodeId,
droppedFullCount.get(), droppedStaleCount.get(),
dispatchedCount.get(), successCount.get(), failureCount.get(),
queueSize, queueRemaining, availablePermits);
}
/**
* Initiates an async HTTP call for the given task.
*
* @return {@code true} if the async subscription was started and the semaphore slot is now
* owned by the callback (which will release it and call {@link #tryProcess()}).
* {@code false} if a synchronous exception occurred before the subscription was
* registered; the semaphore slot has already been released and the caller should
* loop rather than recurse to avoid stack overflow when many queued tasks fail
* synchronously (e.g. misconfigured URL pattern).
*/
private boolean doHttpCall(PendingTask task) {
boolean asyncStarted = false;
try {
String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), task.msg());
HttpMethod method = HttpMethod.valueOf(config.getRequestMethod());
URI uri = buildEncodedUri(endpointUrl);
RequestBodySpec request = webClient
RequestBodySpec req = webClient
.method(method)
.uri(uri)
.headers(headers -> prepareHeaders(headers, msg));
.headers(headers -> prepareHeaders(headers, task.msg()));
if ((HttpMethod.POST.equals(method) || HttpMethod.PUT.equals(method) ||
HttpMethod.PATCH.equals(method) || HttpMethod.DELETE.equals(method)) &&
!config.isIgnoreRequestBody()) {
request.body(BodyInserters.fromValue(getData(msg, config.isParseToPlainText())));
req.body(BodyInserters.fromValue(getData(task.msg(), config.isParseToPlainText())));
}
request
.retrieve()
req.retrieve()
.toEntity(String.class)
.subscribe(responseEntity -> {
.publishOn(Schedulers.fromExecutor(task.ctx().getExternalCallExecutor()))
.doFinally(signalType -> {
// Runs exactly once after onComplete, onError, or cancel — the only
// place that releases the permit for the async path.
if (semaphore != null) {
semaphore.release();
tryProcess();
}
})
.subscribe(responseEntity -> {
if (responseEntity.getStatusCode().is2xxSuccessful()) {
onSuccess.accept(processResponse(ctx, msg, responseEntity));
successCount.incrementAndGet();
task.onSuccess().accept(processResponse(task.ctx(), task.msg(), responseEntity));
} else {
onFailure.accept(processFailureResponse(msg, responseEntity), null);
failureCount.incrementAndGet();
task.onFailure().accept(processFailureResponse(task.msg(), responseEntity), null);
}
}, throwable -> {
if (semaphore != null) {
semaphore.release();
}
onFailure.accept(processException(msg, throwable), processThrowable(throwable));
failureCount.incrementAndGet();
task.onFailure().accept(processException(task.msg(), throwable), processThrowable(throwable));
});
} catch (InterruptedException e) {
log.warn("Timeout during waiting for reply!", e);
asyncStarted = true;
return true;
} catch (Exception e) {
failureCount.incrementAndGet();
task.onFailure().accept(processException(task.msg(), e), processThrowable(e));
return false;
} finally {
// Synchronous permit release: only when the async pipeline was never registered
// (asyncStarted=false). If it was, doFinally owns the release.
// tryProcess() is intentionally not called here — the caller loops iteratively.
if (!asyncStarted && semaphore != null) {
semaphore.release();
}
}
}

7
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.rest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.thingsboard.rule.engine.api.TbHttpClientSettings;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
@ -58,7 +59,11 @@ public class TbRestApiCallNode extends TbAbstractExternalNode {
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class);
httpClient = new TbHttpClient(config, ctx.getSharedEventLoop());
TbHttpClientSettings httpClientSettings = ctx.getTbHttpClientSettings();
httpClient = new TbHttpClient(config, ctx.getSharedEventLoop(),
ctx.getTenantId() != null ? ctx.getTenantId().getId().toString() : "n/a",
ctx.getSelfId() != null ? ctx.getSelfId().getId().toString() : "n/a",
httpClientSettings != null ? httpClientSettings : TbHttpClientSettings.DEFAULT);
}
@Override

2
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java

@ -27,6 +27,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockserver.integration.ClientAndServer;
import org.springframework.util.LinkedMultiValueMap;
import org.thingsboard.common.util.DirectListeningExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
@ -156,6 +157,7 @@ public class TbHttpClientTest {
.build();
var ctx = mock(TbContext.class);
when(ctx.getExternalCallExecutor()).thenReturn(DirectListeningExecutor.INSTANCE);
when(ctx.transformMsg(
eq(msg),
eq(msg.getMetaData()),

151
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbRestApiCallNodeTest.java

@ -27,13 +27,17 @@ import org.apache.http.protocol.HttpRequestHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.DirectListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbHttpClientSettings;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
@ -43,6 +47,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -53,15 +58,22 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest {
static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
@Spy
private TbRestApiCallNode restNode;
@ -125,6 +137,8 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest {
}
});
given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE);
TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration();
config.setRequestMethod("DELETE");
config.setHeaders(Collections.singletonMap("Foo", "Bar"));
@ -148,7 +162,7 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest {
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
verify(ctx, timeout(10_000)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
verify(ctx, timeout(TIMEOUT)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
assertNotSame(metaData, metadataCaptor.getValue());
assertEquals(TbMsg.EMPTY_JSON_OBJECT, dataCaptor.getValue());
@ -183,6 +197,8 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest {
}
});
given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE);
TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration();
config.setRequestMethod("DELETE");
config.setHeaders(Collections.singletonMap("Foo", "Bar"));
@ -206,12 +222,143 @@ public class TbRestApiCallNodeTest extends AbstractRuleNodeUpgradeTest {
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
verify(ctx, timeout(10_000)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
verify(ctx, timeout(TIMEOUT)).transformMsg(msgCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
assertNotSame(metaData, metadataCaptor.getValue());
assertEquals(TbMsg.EMPTY_JSON_OBJECT, dataCaptor.getValue());
}
@Test
public void givenForceAckTrue_whenOnMsgAndServerReturns200_thenAckedImmediatelyAndEnqueuedForTellNext() throws IOException {
final String path = "/path/to/get";
setupServer("*", new HttpRequestHandler() {
@Override
public void handle(HttpRequest request, HttpResponse response, HttpContext context)
throws HttpException, IOException {
response.setStatusCode(200);
}
});
TbMsg transformedMsg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(originator)
.copyMetaData(metaData)
.dataType(TbMsgDataType.JSON)
.data(TbMsg.EMPTY_JSON_OBJECT)
.ruleChainId(ruleChainId)
.ruleNodeId(ruleNodeId)
.build();
given(ctx.isExternalNodeForceAck()).willReturn(true);
given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE);
given(ctx.transformMsg(any(), any(), any())).willReturn(transformedMsg);
TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration();
config.setRequestMethod("GET");
config.setIgnoreRequestBody(true);
config.setRestEndpointUrlPattern(String.format("http://localhost:%d%s", server.getLocalPort(), path));
initWithConfig(config);
TbMsg msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(originator)
.copyMetaData(metaData)
.dataType(TbMsgDataType.JSON)
.data(TbMsg.EMPTY_JSON_OBJECT)
.ruleChainId(ruleChainId)
.ruleNodeId(ruleNodeId)
.build();
restNode.onMsg(ctx, msg);
verify(ctx).ack(msg);
verify(ctx, timeout(TIMEOUT)).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS));
verify(ctx, never()).tellSuccess(any());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void givenMaxParallelRequestsCountAndBadUrl_whenOnMsg_thenSemaphoreIsReleasedAndFailureReported(boolean forceAck) throws IOException {
given(ctx.isExternalNodeForceAck()).willReturn(forceAck);
TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration();
config.setMaxParallelRequestsCount(1);
config.setRestEndpointUrlPattern("");
initWithConfig(config);
TbMsg msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(originator)
.copyMetaData(metaData)
.dataType(TbMsgDataType.JSON)
.data(TbMsg.EMPTY_JSON_OBJECT)
.ruleChainId(ruleChainId)
.ruleNodeId(ruleNodeId)
.build();
restNode.onMsg(ctx, msg);
assertThat(restNode.httpClient.getSemaphore().availablePermits()).isEqualTo(1);
if (forceAck) {
verify(ctx).enqueueForTellFailure(any(), any(Throwable.class));
} else {
verify(ctx).tellFailure(any(), any());
}
}
@Test
public void givenMaxPendingRequestsExceeded_whenOnMsg_thenFailsImmediatelyAndQueuedRequestFiresAfterSlotOpens() throws IOException, InterruptedException {
CountDownLatch releaseResponse = new CountDownLatch(1);
setupServer("*", (request, response, context) -> {
try {
releaseResponse.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
response.setStatusCode(200);
});
given(ctx.isExternalNodeForceAck()).willReturn(false);
given(ctx.getExternalCallExecutor()).willReturn(DirectListeningExecutor.INSTANCE);
// Simulate server-level cap: maxPendingRequests=1 via TbHttpClientSettings
given(ctx.getTbHttpClientSettings()).willReturn(new TbHttpClientSettings() {
@Override public int getMaxParallelRequests() { return 0; }
@Override public int getMaxPendingRequests() { return 1; }
@Override public int getPoolMaxConnections() { return 0; }
});
TbMsg transformedMsg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(originator)
.copyMetaData(metaData)
.data(TbMsg.EMPTY_JSON_OBJECT)
.build();
given(ctx.transformMsg(any(), any(), any())).willReturn(transformedMsg);
TbRestApiCallNodeConfiguration config = new TbRestApiCallNodeConfiguration().defaultConfiguration();
config.setMaxParallelRequestsCount(1);
config.setRequestMethod("GET");
config.setIgnoreRequestBody(true);
config.setRestEndpointUrlPattern(String.format("http://localhost:%d/path", server.getLocalPort()));
initWithConfig(config);
TbMsg msg1 = TbMsg.newMsg().type(TbMsgType.POST_TELEMETRY_REQUEST).originator(originator)
.copyMetaData(metaData).dataType(TbMsgDataType.JSON).data(TbMsg.EMPTY_JSON_OBJECT)
.ruleChainId(ruleChainId).ruleNodeId(ruleNodeId).build();
TbMsg msg2 = TbMsg.newMsg().type(TbMsgType.POST_TELEMETRY_REQUEST).originator(originator)
.copyMetaData(metaData).dataType(TbMsgDataType.JSON).data(TbMsg.EMPTY_JSON_OBJECT)
.ruleChainId(ruleChainId).ruleNodeId(ruleNodeId).build();
TbMsg msg3 = TbMsg.newMsg().type(TbMsgType.POST_TELEMETRY_REQUEST).originator(originator)
.copyMetaData(metaData).dataType(TbMsgDataType.JSON).data(TbMsg.EMPTY_JSON_OBJECT)
.ruleChainId(ruleChainId).ruleNodeId(ruleNodeId).build();
restNode.onMsg(ctx, msg1); // fires immediately (semaphore acquired)
restNode.onMsg(ctx, msg2); // queues (semaphore exhausted, queue has room)
restNode.onMsg(ctx, msg3); // fails immediately (queue full — server-level maxPendingRequests=1)
verify(ctx, timeout(TIMEOUT)).tellFailure(any(), any());
releaseResponse.countDown();
verify(ctx, timeout(TIMEOUT).times(2)).tellSuccess(any());
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// config for version 2 with upgrade from version 0

Loading…
Cancel
Save