45 changed files with 1413 additions and 176 deletions
@ -0,0 +1,146 @@ |
|||
/** |
|||
* Copyright © 2016-2026 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.common; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.queue.TbQueueMsg; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.hamcrest.MatcherAssert.assertThat; |
|||
import static org.hamcrest.Matchers.empty; |
|||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
|||
import static org.hamcrest.Matchers.is; |
|||
import static org.mockito.ArgumentMatchers.anyLong; |
|||
import static org.mockito.BDDMockito.never; |
|||
import static org.mockito.BDDMockito.spy; |
|||
import static org.mockito.BDDMockito.times; |
|||
import static org.mockito.BDDMockito.verify; |
|||
|
|||
@Slf4j |
|||
@ExtendWith(MockitoExtension.class) |
|||
public class AbstractTbQueueConsumerTemplateTest { |
|||
|
|||
private static final long POLL_DURATION_MS = 100L; |
|||
private static final long SLEEP_TOLERANCE_MS = 20L; |
|||
|
|||
@Test |
|||
public void givenEmptyPartitionsAndLongPollingSupported_whenPoll_thenSleepsAndDoesNotCallDoPoll() { |
|||
// Regression: with empty partitions AND isLongPollingSupported()==true (e.g. Kafka),
|
|||
// poll() previously returned instantly with no sleep, causing the consumer loop to busy-spin.
|
|||
TestConsumer consumer = spy(new TestConsumer("test-topic", true)); |
|||
consumer.subscribe(Collections.emptySet()); |
|||
|
|||
long startNs = System.nanoTime(); |
|||
List<TbQueueMsg> result = consumer.poll(POLL_DURATION_MS); |
|||
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); |
|||
|
|||
assertThat(result, is(empty())); |
|||
verify(consumer, never()).doPoll(anyLong()); |
|||
assertThat("poll() must sleep ~durationInMillis when partitions are empty (no busy-wait)", |
|||
elapsedMs, greaterThanOrEqualTo(POLL_DURATION_MS - SLEEP_TOLERANCE_MS)); |
|||
} |
|||
|
|||
@Test |
|||
public void givenEmptyPartitionsAndNoLongPolling_whenPoll_thenSleepsAndDoesNotCallDoPoll() { |
|||
TestConsumer consumer = spy(new TestConsumer("test-topic", false)); |
|||
consumer.subscribe(Collections.emptySet()); |
|||
|
|||
long startNs = System.nanoTime(); |
|||
List<TbQueueMsg> result = consumer.poll(POLL_DURATION_MS); |
|||
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); |
|||
|
|||
assertThat(result, is(empty())); |
|||
verify(consumer, never()).doPoll(anyLong()); |
|||
assertThat(elapsedMs, greaterThanOrEqualTo(POLL_DURATION_MS - SLEEP_TOLERANCE_MS)); |
|||
} |
|||
|
|||
@Test |
|||
public void givenNonEmptyPartitions_whenPoll_thenCallsDoPoll() { |
|||
TestConsumer consumer = spy(new TestConsumer("test-topic", true)); |
|||
consumer.subscribe(Collections.singleton(new TopicPartitionInfo("test-topic", null, 0, true))); |
|||
|
|||
List<TbQueueMsg> result = consumer.poll(POLL_DURATION_MS); |
|||
|
|||
assertThat(result, is(empty())); |
|||
verify(consumer, times(1)).doPoll(POLL_DURATION_MS); |
|||
} |
|||
|
|||
@Test |
|||
public void givenPartitionsBecomeEmptyAfterRebalance_whenPollAgain_thenStopsCallingDoPoll() { |
|||
// Reproduces the observed trigger: a rebalance leaves the consumer with an empty
|
|||
// partition assignment. Subsequent poll() calls must not busy-spin or call doPoll().
|
|||
TestConsumer consumer = spy(new TestConsumer("test-topic", true)); |
|||
consumer.subscribe(Collections.singleton(new TopicPartitionInfo("test-topic", null, 0, true))); |
|||
consumer.poll(POLL_DURATION_MS); |
|||
verify(consumer, times(1)).doPoll(POLL_DURATION_MS); |
|||
|
|||
consumer.subscribe(Collections.emptySet()); |
|||
|
|||
long startNs = System.nanoTime(); |
|||
List<TbQueueMsg> result = consumer.poll(POLL_DURATION_MS); |
|||
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); |
|||
|
|||
assertThat(result, is(empty())); |
|||
verify(consumer, times(1)).doPoll(anyLong()); |
|||
assertThat(elapsedMs, greaterThanOrEqualTo(POLL_DURATION_MS - SLEEP_TOLERANCE_MS)); |
|||
} |
|||
|
|||
static class TestConsumer extends AbstractTbQueueConsumerTemplate<Object, TbQueueMsg> { |
|||
|
|||
private final boolean longPollingSupported; |
|||
|
|||
TestConsumer(String topic, boolean longPollingSupported) { |
|||
super(topic); |
|||
this.longPollingSupported = longPollingSupported; |
|||
} |
|||
|
|||
@Override |
|||
protected List<Object> doPoll(long durationInMillis) { |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
@Override |
|||
protected TbQueueMsg decode(Object record) { |
|||
return null; |
|||
} |
|||
|
|||
@Override |
|||
protected void doSubscribe(Set<TopicPartitionInfo> partitions) { |
|||
} |
|||
|
|||
@Override |
|||
protected void doCommit() { |
|||
} |
|||
|
|||
@Override |
|||
protected void doUnsubscribe() { |
|||
} |
|||
|
|||
@Override |
|||
protected boolean isLongPollingSupported() { |
|||
return longPollingSupported; |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
/** |
|||
* Copyright © 2016-2026 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.msa.security; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import org.testng.annotations.AfterClass; |
|||
import org.testng.annotations.BeforeClass; |
|||
import org.testng.annotations.Test; |
|||
import org.thingsboard.server.msa.AbstractContainerTest; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
public class JsExecutorSandboxIsolationTest extends AbstractContainerTest { |
|||
|
|||
@BeforeClass |
|||
public void beforeClass() { |
|||
testRestClient.login("tenant@thingsboard.org", "tenant"); |
|||
} |
|||
|
|||
@AfterClass |
|||
public void afterClass() { |
|||
testRestClient.resetToken(); |
|||
} |
|||
|
|||
/** |
|||
* Black-box regression for JVN#16937365: a tenant admin must not be able |
|||
* to escape the tb-js-executor sandbox via the host-realm prototype chain |
|||
* exposed through the script's `args` argument. Runs against the live |
|||
* docker-compose deployment, which uses script.use_sandbox=true and |
|||
* JS_EVALUATOR=remote (Kafka -> tb-js-executor). |
|||
*/ |
|||
@Test |
|||
public void testRuleChainScriptCannotReachHostProcess() { |
|||
JsonNode response = testRestClient.testRuleChainScript(""" |
|||
{ |
|||
"script": "var F = args.constructor.constructor; var p = F('return process')(); return { reachedHost: !!(p && p.mainModule) };", |
|||
"scriptType": "update", |
|||
"argNames": ["msg", "metadata", "msgType"], |
|||
"msg": "{}", |
|||
"metadata": {}, |
|||
"msgType": "POST_TELEMETRY_REQUEST" |
|||
} |
|||
"""); |
|||
|
|||
// The sandboxed run must reject the escape attempt: the host `process`
|
|||
// global is not defined inside the sandbox realm, so executing the
|
|||
// synthesized function `F("return process")` throws.
|
|||
assertThat(response.has("error")).isTrue(); |
|||
String error = response.get("error").asText(); |
|||
assertThat(error) |
|||
.as("sandbox must block host-realm reach via args.constructor.constructor; full error: %s", error) |
|||
.contains("process is not defined"); |
|||
|
|||
// Defense in depth: even if the script somehow returned, output must
|
|||
// not indicate that the host process was reached.
|
|||
if (response.hasNonNull("output")) { |
|||
assertThat(response.get("output").asText()).doesNotContain("\"reachedHost\":true"); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,83 @@ |
|||
///
|
|||
/// Copyright © 2016-2026 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import { describe, test } from 'node:test'; |
|||
import assert from 'node:assert/strict'; |
|||
import { JsExecutor } from '../api/jsExecutor'; |
|||
|
|||
// describe('js-executor') groups all cases under <testsuite name="js-executor">
|
|||
// in the JUnit XML so they show up under that suite in TeamCity's Tests tab,
|
|||
// alongside thousands of Java tests.
|
|||
describe('js-executor', () => { |
|||
|
|||
test('sandbox isolates args from host realm (JVN#16937365)', async () => { |
|||
const exec = new JsExecutor(true); |
|||
const script = await exec.compileScript(`function(msg, metadata, msgType){
|
|||
var F = args.constructor.constructor; |
|||
var p = F("return process")(); |
|||
return p && p.mainModule ? 'reached-host' : 'isolated'; |
|||
}`);
|
|||
await assert.rejects( |
|||
exec.executeScript(script, ['{}', '{}', 'POST_TELEMETRY_REQUEST'], 5000), |
|||
/process is not defined/, |
|||
'host process must not be reachable from inside the sandbox', |
|||
); |
|||
}); |
|||
|
|||
test('sandbox passes string args through unchanged', async () => { |
|||
const exec = new JsExecutor(true); |
|||
const script = await exec.compileScript(`function(msg, metadata, msgType){
|
|||
return { msgIsString: typeof msg === 'string', count: args.length, first: args[0] }; |
|||
}`);
|
|||
const out = await exec.executeScript(script, ['hello', '{}', 'X'], 5000); |
|||
// Field-by-field: the returned object is owned by the sandbox realm, so
|
|||
// its prototype is not the host Object.prototype and deepStrictEqual would
|
|||
// reject it on prototype mismatch even when the values match.
|
|||
assert.equal(out.msgIsString, true); |
|||
assert.equal(out.count, 3); |
|||
assert.equal(out.first, 'hello'); |
|||
}); |
|||
|
|||
// The use_sandbox=false path is intentionally non-isolating: scripts compile
|
|||
// and run in the host realm via vm.compileFunction. The two tests below codify
|
|||
// that documented contract so any future behavior change shows up as a test
|
|||
// failure and forces a deliberate update of the docs and threat model.
|
|||
|
|||
test('non-sandbox path does not isolate from host realm (documented contract)', async () => { |
|||
const exec = new JsExecutor(false); |
|||
const script = await exec.compileScript(`function(msg, metadata, msgType){
|
|||
// Non-destructive host-reach probe: typeof process.platform is 'string'
|
|||
// only if the host process object is reachable.
|
|||
var F = args.constructor.constructor; |
|||
return F('return typeof process.platform')(); |
|||
}`);
|
|||
const out = await exec.executeScript(script, ['{}', '{}', 'X']); |
|||
assert.equal(out, 'string', |
|||
'use_sandbox=false is documented as non-isolating; if this fails, the path was changed and docs/threat model must be updated'); |
|||
}); |
|||
|
|||
test('non-sandbox path passes string args through unchanged', async () => { |
|||
const exec = new JsExecutor(false); |
|||
const script = await exec.compileScript(`function(msg, metadata, msgType){
|
|||
return { msgIsString: typeof msg === 'string', count: args.length, first: args[0] }; |
|||
}`);
|
|||
const out = await exec.executeScript(script, ['hello', '{}', 'X']); |
|||
assert.equal(out.msgIsString, true); |
|||
assert.equal(out.count, 3); |
|||
assert.equal(out.first, 'hello'); |
|||
}); |
|||
|
|||
}); // describe('js-executor')
|
|||
Loading…
Reference in new issue