|
Before Width: | Height: | Size: 9.0 KiB After Width: | Height: | Size: 7.4 KiB |
|
Before Width: | Height: | Size: 17 KiB After Width: | Height: | Size: 16 KiB |
|
Before Width: | Height: | Size: 21 KiB After Width: | Height: | Size: 19 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 8.9 KiB |
|
Before Width: | Height: | Size: 9.0 KiB After Width: | Height: | Size: 7.4 KiB |
|
Before Width: | Height: | Size: 17 KiB After Width: | Height: | Size: 16 KiB |
|
Before Width: | Height: | Size: 9.0 KiB After Width: | Height: | Size: 7.4 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 8.9 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 8.9 KiB |
|
Before Width: | Height: | Size: 17 KiB After Width: | Height: | Size: 16 KiB |
|
Before Width: | Height: | Size: 9.0 KiB After Width: | Height: | Size: 7.4 KiB |
|
Before Width: | Height: | Size: 17 KiB After Width: | Height: | Size: 16 KiB |
|
Before Width: | Height: | Size: 10 KiB After Width: | Height: | Size: 8.9 KiB |
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.config.mqtt; |
|||
|
|||
import jakarta.validation.constraints.PositiveOrZero; |
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.validation.annotation.Validated; |
|||
|
|||
@Data |
|||
@Validated |
|||
@Configuration |
|||
@ConfigurationProperties(prefix = "mqtt.client.retransmission") |
|||
public class MqttClientRetransmissionSettingsComponent { |
|||
|
|||
@PositiveOrZero |
|||
private int maxAttempts; |
|||
@PositiveOrZero |
|||
private long initialDelayMillis; |
|||
@PositiveOrZero |
|||
private double jitterFactor; |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.config.mqtt; |
|||
|
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.ToString; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.thingsboard.rule.engine.api.MqttClientSettings; |
|||
|
|||
@ToString |
|||
@EqualsAndHashCode |
|||
@Configuration |
|||
@RequiredArgsConstructor |
|||
public class MqttClientSettingsComponent implements MqttClientSettings { |
|||
|
|||
private final MqttClientRetransmissionSettingsComponent retransmissionSettingsComponent; |
|||
|
|||
@Override |
|||
public int getRetransmissionMaxAttempts() { |
|||
return retransmissionSettingsComponent.getMaxAttempts(); |
|||
} |
|||
|
|||
@Override |
|||
public long getRetransmissionInitialDelayMillis() { |
|||
return retransmissionSettingsComponent.getInitialDelayMillis(); |
|||
} |
|||
|
|||
@Override |
|||
public double getRetransmissionJitterFactor() { |
|||
return retransmissionSettingsComponent.getJitterFactor(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.security.auth; |
|||
|
|||
import jakarta.servlet.FilterChain; |
|||
import jakarta.servlet.http.HttpServletRequest; |
|||
import jakarta.servlet.http.HttpServletResponse; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.security.core.AuthenticationException; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.web.filter.OncePerRequestFilter; |
|||
import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; |
|||
|
|||
@Component |
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class AuthExceptionHandler extends OncePerRequestFilter { |
|||
|
|||
private final ThingsboardErrorResponseHandler errorResponseHandler; |
|||
|
|||
@Override |
|||
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) { |
|||
try { |
|||
filterChain.doFilter(request, response); |
|||
} catch (AuthenticationException e) { |
|||
throw e; |
|||
} catch (Exception e) { |
|||
errorResponseHandler.handle(e, response); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,120 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.script.api.js; |
|||
|
|||
import java.util.regex.Pattern; |
|||
|
|||
public class JsValidator { |
|||
|
|||
static final Pattern ASYNC_PATTERN = Pattern.compile("\\basync\\b"); |
|||
static final Pattern AWAIT_PATTERN = Pattern.compile("\\bawait\\b"); |
|||
static final Pattern PROMISE_PATTERN = Pattern.compile("\\bPromise\\b"); |
|||
static final Pattern SET_TIMEOUT_PATTERN = Pattern.compile("\\bsetTimeout\\b"); |
|||
|
|||
public static String validate(String scriptBody) { |
|||
if (scriptBody == null || scriptBody.trim().isEmpty()) { |
|||
return "Script body is empty"; |
|||
} |
|||
|
|||
//Quick check
|
|||
if (!ASYNC_PATTERN.matcher(scriptBody).find() |
|||
&& !AWAIT_PATTERN.matcher(scriptBody).find() |
|||
&& !PROMISE_PATTERN.matcher(scriptBody).find() |
|||
&& !SET_TIMEOUT_PATTERN.matcher(scriptBody).find()) { |
|||
return null; |
|||
} |
|||
|
|||
//Recheck if quick check failed. Ignoring comments and strings
|
|||
String[] lines = scriptBody.split("\\r?\\n"); |
|||
boolean insideMultilineComment = false; |
|||
|
|||
for (String line : lines) { |
|||
String stripped = line; |
|||
|
|||
// Handle multiline comments
|
|||
if (insideMultilineComment) { |
|||
if (line.contains("*/")) { |
|||
insideMultilineComment = false; |
|||
stripped = line.substring(line.indexOf("*/") + 2); // continue after comment
|
|||
} else { |
|||
continue; // skip line inside multiline comment
|
|||
} |
|||
} |
|||
|
|||
// Check for start of multiline comment
|
|||
if (stripped.contains("/*")) { |
|||
int start = stripped.indexOf("/*"); |
|||
int end = stripped.indexOf("*/", start + 2); |
|||
|
|||
if (end != -1) { |
|||
// Inline multiline comment
|
|||
stripped = stripped.substring(0, start) + stripped.substring(end + 2); |
|||
} else { |
|||
// Starts a block comment, continues on next lines
|
|||
insideMultilineComment = true; |
|||
stripped = stripped.substring(0, start); |
|||
} |
|||
} |
|||
|
|||
stripped = stripInlineComment(stripped); |
|||
stripped = stripStringLiterals(stripped); |
|||
|
|||
if (ASYNC_PATTERN.matcher(stripped).find()) { |
|||
return "Script must not contain 'async' keyword."; |
|||
} |
|||
if (AWAIT_PATTERN.matcher(stripped).find()) { |
|||
return "Script must not contain 'await' keyword."; |
|||
} |
|||
if (PROMISE_PATTERN.matcher(stripped).find()) { |
|||
return "Script must not use 'Promise'."; |
|||
} |
|||
if (SET_TIMEOUT_PATTERN.matcher(stripped).find()) { |
|||
return "Script must not use 'setTimeout' method."; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
private static String stripInlineComment(String line) { |
|||
int index = line.indexOf("//"); |
|||
return index >= 0 ? line.substring(0, index) : line; |
|||
} |
|||
|
|||
private static String stripStringLiterals(String line) { |
|||
StringBuilder sb = new StringBuilder(); |
|||
boolean inSingleQuote = false; |
|||
boolean inDoubleQuote = false; |
|||
|
|||
for (int i = 0; i < line.length(); i++) { |
|||
char c = line.charAt(i); |
|||
|
|||
if (c == '"' && !inSingleQuote) { |
|||
inDoubleQuote = !inDoubleQuote; |
|||
continue; |
|||
} else if (c == '\'' && !inDoubleQuote) { |
|||
inSingleQuote = !inSingleQuote; |
|||
continue; |
|||
} |
|||
|
|||
if (!inSingleQuote && !inDoubleQuote) { |
|||
sb.append(c); |
|||
} |
|||
} |
|||
|
|||
return sb.toString(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,122 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.script.api; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.stats.StatsCounter; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.TimeoutException; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.junit.jupiter.api.Assertions.assertNull; |
|||
import static org.junit.jupiter.api.Assertions.assertThrows; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyString; |
|||
import static org.mockito.Mockito.doCallRealMethod; |
|||
import static org.mockito.Mockito.doReturn; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.never; |
|||
import static org.mockito.Mockito.verify; |
|||
|
|||
class AbstractScriptInvokeServiceTest { |
|||
|
|||
AbstractScriptInvokeService service; |
|||
final UUID id = UUID.randomUUID(); |
|||
final String scriptBody = "return true;"; |
|||
final TenantId tenantId = TenantId.fromUUID(UUID.fromString("2ed9a658-45a5-4812-b212-9931f5749f30")); |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
service = mock(AbstractScriptInvokeService.class, Mockito.RETURNS_DEEP_STUBS); |
|||
|
|||
// Make sure core checks always pass
|
|||
doReturn(true).when(service).isExecEnabled(any()); |
|||
doReturn(50000L).when(service).getMaxScriptBodySize(); |
|||
|
|||
// Use real implementations
|
|||
doCallRealMethod().when(service).scriptBodySizeExceeded(anyString()); |
|||
doCallRealMethod().when(service).eval(any(), any(), any(), any(String[].class)); |
|||
doCallRealMethod().when(service).error(anyString()); |
|||
doCallRealMethod().when(service).validate(any(), anyString()); |
|||
} |
|||
|
|||
@Test |
|||
void evalWithValidationCallTest() throws ExecutionException, InterruptedException, TimeoutException { |
|||
ReflectionTestUtils.setField(service, "requestsCounter", mock(StatsCounter.class)); |
|||
ReflectionTestUtils.setField(service, "evalCallback", mock(FutureCallback.class)); |
|||
|
|||
doReturn(Futures.immediateFuture(id)).when(service).doEvalScript(any(), any(), anyString(), any(), any(String[].class)); |
|||
|
|||
var future = service.eval(tenantId, ScriptType.RULE_NODE_SCRIPT, scriptBody, "x", "y"); |
|||
|
|||
assertThat(future.get(30, TimeUnit.SECONDS)).isEqualTo(id); |
|||
verify(service).validate(any(), anyString()); |
|||
verify(service).validate(tenantId, scriptBody); |
|||
verify(service, never()).error(anyString()); |
|||
} |
|||
|
|||
@Test |
|||
void evalWithValidationCallErrorTest() throws ExecutionException, InterruptedException, TimeoutException { |
|||
doReturn(false).when(service).isExecEnabled(any()); |
|||
var future = service.eval(tenantId, ScriptType.RULE_NODE_SCRIPT, scriptBody, "x", "y"); |
|||
|
|||
ExecutionException ex = assertThrows(ExecutionException.class, future::get); |
|||
assertThat(ex.getCause().getMessage()).isEqualTo("Script Execution is disabled due to API limits!"); |
|||
assertThat(ex.getCause()).isInstanceOf(RuntimeException.class); |
|||
|
|||
verify(service).validate(any(), anyString()); |
|||
verify(service).validate(tenantId, scriptBody); |
|||
verify(service).error(anyString()); |
|||
} |
|||
|
|||
@Test |
|||
void validateScriptBodyTestExecEnabledTest() { |
|||
assertNull(service.validate(tenantId, scriptBody)); |
|||
verify(service).isExecEnabled(tenantId); |
|||
} |
|||
|
|||
@Test |
|||
void validateScriptBodyTestExecDisabledTest() { |
|||
doReturn(false).when(service).isExecEnabled(tenantId); |
|||
assertThat(service.validate(tenantId, scriptBody)).isEqualTo("Script Execution is disabled due to API limits!"); |
|||
verify(service).isExecEnabled(tenantId); |
|||
} |
|||
|
|||
@Test |
|||
void validateScriptBodySizeOKTest() { |
|||
assertNull(service.validate(tenantId, scriptBody)); |
|||
verify(service).isExecEnabled(tenantId); |
|||
verify(service).scriptBodySizeExceeded(scriptBody); |
|||
} |
|||
|
|||
@Test |
|||
void validateScriptBodySizeExceededTest() { |
|||
doReturn(10L).when(service).getMaxScriptBodySize(); |
|||
assertThat(service.validate(tenantId, scriptBody)).isEqualTo("Script body exceeds maximum allowed size of 10 symbols"); |
|||
verify(service).isExecEnabled(tenantId); |
|||
verify(service).scriptBodySizeExceeded(scriptBody); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,91 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.script.api.js; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import org.thingsboard.server.common.stats.StatsCounter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.script.api.ScriptType; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.TimeoutException; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.junit.jupiter.api.Assertions.assertThrows; |
|||
import static org.junit.jupiter.api.Assertions.assertTrue; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyString; |
|||
import static org.mockito.Mockito.doCallRealMethod; |
|||
import static org.mockito.Mockito.doReturn; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.times; |
|||
import static org.mockito.Mockito.verify; |
|||
|
|||
@Slf4j |
|||
class AbstractJsInvokeServiceTest { |
|||
|
|||
AbstractJsInvokeService service; |
|||
final UUID id = UUID.randomUUID(); |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
service = mock(AbstractJsInvokeService.class, Mockito.RETURNS_DEEP_STUBS); |
|||
|
|||
ReflectionTestUtils.setField(service, "requestsCounter", mock(StatsCounter.class)); |
|||
ReflectionTestUtils.setField(service, "evalCallback", mock(FutureCallback.class)); |
|||
|
|||
// Make sure core checks always pass
|
|||
doReturn(true).when(service).isExecEnabled(any()); |
|||
doReturn(false).when(service).scriptBodySizeExceeded(anyString()); |
|||
doReturn(Futures.immediateFuture(id)).when(service).doEvalScript(any(), any(), anyString(), any(), any(String[].class)); |
|||
|
|||
// Use real implementations
|
|||
doCallRealMethod().when(service).eval(any(), any(), any(), any(String[].class)); |
|||
doCallRealMethod().when(service).error(anyString()); |
|||
doCallRealMethod().when(service).validate(any(), anyString()); |
|||
} |
|||
|
|||
@Test |
|||
void shouldReturnValidationErrorFromJsValidator() throws ExecutionException, InterruptedException { |
|||
String scriptWithAsync = "async function test() {}"; |
|||
|
|||
var future = service.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, scriptWithAsync, "a", "b"); |
|||
ExecutionException ex = assertThrows(ExecutionException.class, future::get); |
|||
assertTrue(ex.getCause().getMessage().contains("Script must not contain 'async' keyword.")); |
|||
assertThat(ex.getCause()).isInstanceOf(RuntimeException.class); |
|||
verify(service).isExecEnabled(any()); |
|||
verify(service).scriptBodySizeExceeded(any()); |
|||
} |
|||
|
|||
@Test |
|||
void shouldPassValidationAndCallSuperEval() throws ExecutionException, InterruptedException, TimeoutException { |
|||
String validScript = "function test() { return 42; }"; |
|||
var result = service.eval(TenantId.SYS_TENANT_ID, ScriptType.RULE_NODE_SCRIPT, validScript, "x", "y"); |
|||
|
|||
assertThat(result.get(30, TimeUnit.SECONDS)).isEqualTo(id); |
|||
verify(service, times(1)).isExecEnabled(any()); |
|||
verify(service, times(1)).scriptBodySizeExceeded(any()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,88 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.script.api.js; |
|||
|
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.NullAndEmptySource; |
|||
import org.junit.jupiter.params.provider.ValueSource; |
|||
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals; |
|||
import static org.junit.jupiter.api.Assertions.assertNotNull; |
|||
import static org.junit.jupiter.api.Assertions.assertNull; |
|||
|
|||
class JsValidatorTest { |
|||
|
|||
@ParameterizedTest(name = "should return error for script \"{0}\"") |
|||
@ValueSource(strings = { |
|||
"async function test() {}", |
|||
"const result = await someFunc();", |
|||
"const result =\nawait\tsomeFunc();", |
|||
"setTimeout(1000);", |
|||
"new Promise((resolve) => {});", |
|||
"function test() { return 42; } \n\t await test()", |
|||
""" |
|||
function init() { |
|||
await doSomething(); |
|||
} |
|||
""", |
|||
}) |
|||
void shouldReturnErrorForInvalidScripts(String script) { |
|||
assertNotNull(JsValidator.validate(script)); |
|||
} |
|||
|
|||
@ParameterizedTest(name = "should pass validation for script: \"{0}\"") |
|||
@ValueSource(strings = { |
|||
"function test() { return 42; }", |
|||
"const result = 10 * 2;", |
|||
"// async is a keyword but not used: 'const word = \"async\";'", |
|||
"let note = 'setTimeout tight';", |
|||
|
|||
"const word = \"async\";", |
|||
"const word = \"setTimeout\";", |
|||
"const word = \"Promise\";", |
|||
"const word = \"await\";", |
|||
|
|||
"const word = 'async';", |
|||
"const word = 'setTimeout';", |
|||
"const word = 'Promise';", |
|||
"const word = 'await';", |
|||
|
|||
"//function test() { return 42; }", |
|||
"// const result = 10 * 2;", |
|||
"// async is a keyword but not used: 'const word = \"async\";'", |
|||
"//setTimeout(1);", |
|||
|
|||
"a=b+c; // await for a day", |
|||
"return new // Promise((resolve) => {", |
|||
"hello(); // async is a keyword but not used: 'const word = \"async\";'", |
|||
"setGoal(a); //setTimeout(1);", |
|||
|
|||
" /* new Promise((resolve) => {}); // */ return 'await';", |
|||
" /* async */ function calc() {", |
|||
"/* async function abc() { \n await new Promise ( \t setTimeout () ) \n } \n*/", |
|||
}) |
|||
void shouldReturnNullForValidScripts(String script) { |
|||
assertNull(JsValidator.validate(script)); |
|||
} |
|||
|
|||
@ParameterizedTest(name = "should return 'Script body is empty' for input: \"{0}\"") |
|||
@NullAndEmptySource |
|||
@ValueSource(strings = {" ", "\t", "\n"}) |
|||
void shouldReturnErrorForEmptyOrNullScripts(String script) { |
|||
assertEquals("Script body is empty", JsValidator.validate(script)); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,98 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.component; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.mockito.Mockito; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; |
|||
import org.thingsboard.server.common.data.plugin.ComponentDescriptor; |
|||
import org.thingsboard.server.common.data.plugin.ComponentScope; |
|||
import org.thingsboard.server.common.data.plugin.ComponentType; |
|||
import org.thingsboard.server.dao.exception.IncorrectParameterException; |
|||
|
|||
import static org.junit.jupiter.api.Assertions.assertFalse; |
|||
import static org.junit.jupiter.api.Assertions.assertThrows; |
|||
import static org.junit.jupiter.api.Assertions.assertTrue; |
|||
|
|||
class BaseComponentDescriptorServiceTest { |
|||
|
|||
private BaseComponentDescriptorService service; |
|||
private ComponentDescriptor componentDescriptor; |
|||
private TenantId tenantId; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
service = Mockito.spy(BaseComponentDescriptorService.class); |
|||
tenantId = TenantId.SYS_TENANT_ID; |
|||
|
|||
// Create a simple component descriptor
|
|||
componentDescriptor = new ComponentDescriptor(); |
|||
componentDescriptor.setType(ComponentType.ACTION); |
|||
componentDescriptor.setScope(ComponentScope.TENANT); |
|||
componentDescriptor.setClusteringMode(ComponentClusteringMode.ENABLED); |
|||
componentDescriptor.setName("Test Component"); |
|||
componentDescriptor.setClazz("org.thingsboard.test.TestComponent"); |
|||
|
|||
// Create configuration descriptor with schema from JSON string
|
|||
String configDescriptorJson = """ |
|||
{ |
|||
"schema": { |
|||
"type": "object", |
|||
"properties": { |
|||
"testField": { |
|||
"type": "string" |
|||
} |
|||
}, |
|||
"required": ["testField"] |
|||
} |
|||
}"""; |
|||
|
|||
componentDescriptor.setConfigurationDescriptor(JacksonUtil.toJsonNode(configDescriptorJson)); |
|||
} |
|||
|
|||
@Test |
|||
void testValidate() { |
|||
// Create valid configuration from JSON string
|
|||
String validConfigJson = "{\"testField\": \"test value\"}"; |
|||
JsonNode validConfig = JacksonUtil.toJsonNode(validConfigJson); |
|||
|
|||
// Create invalid configuration (missing required field) from JSON string
|
|||
String invalidConfigJson = "{}"; |
|||
JsonNode invalidConfig = JacksonUtil.toJsonNode(invalidConfigJson); |
|||
|
|||
// Test valid configuration
|
|||
boolean validResult = service.validate(tenantId, componentDescriptor, validConfig); |
|||
assertTrue(validResult, "Valid configuration should pass validation"); |
|||
|
|||
// Test invalid configuration
|
|||
boolean invalidResult = service.validate(tenantId, componentDescriptor, invalidConfig); |
|||
assertFalse(invalidResult, "Invalid configuration should fail validation"); |
|||
|
|||
// Test with component descriptor without schema
|
|||
ComponentDescriptor noSchemaDescriptor = new ComponentDescriptor(componentDescriptor); |
|||
noSchemaDescriptor.setConfigurationDescriptor(JacksonUtil.toJsonNode("{}")); |
|||
|
|||
// Should throw exception when schema is missing
|
|||
assertThrows(IncorrectParameterException.class, () -> { |
|||
service.validate(tenantId, noSchemaDescriptor, validConfig); |
|||
}, "Should throw exception when schema is missing"); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
public class MaxRetransmissionsReachedException extends RuntimeException { |
|||
|
|||
public MaxRetransmissionsReachedException(String message) { |
|||
super(message); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,21 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
@FunctionalInterface |
|||
public interface ReconnectStrategy { |
|||
long getNextReconnectDelay(); |
|||
} |
|||
@ -0,0 +1,82 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.util.concurrent.ThreadLocalRandom; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Getter |
|||
@Slf4j |
|||
public class ReconnectStrategyExponential implements ReconnectStrategy { |
|||
|
|||
public static final int DEFAULT_RECONNECT_INTERVAL_SEC = 10; |
|||
public static final int MAX_RECONNECT_INTERVAL_SEC = 60; |
|||
public static final int EXP_MAX = 8; |
|||
public static final long JITTER_MAX = 1; |
|||
private final long reconnectIntervalMinSeconds; |
|||
private final long reconnectIntervalMaxSeconds; |
|||
private long lastDisconnectNanoTime = 0; //isotonic time
|
|||
private long retryCount = 0; |
|||
|
|||
public ReconnectStrategyExponential(long reconnectIntervalMinSeconds) { |
|||
this.reconnectIntervalMaxSeconds = calculateIntervalMax(reconnectIntervalMinSeconds); |
|||
this.reconnectIntervalMinSeconds = calculateIntervalMin(reconnectIntervalMinSeconds); |
|||
} |
|||
|
|||
long calculateIntervalMax(long reconnectIntervalMinSeconds) { |
|||
return reconnectIntervalMinSeconds > MAX_RECONNECT_INTERVAL_SEC ? reconnectIntervalMinSeconds : MAX_RECONNECT_INTERVAL_SEC; |
|||
} |
|||
|
|||
long calculateIntervalMin(long reconnectIntervalMinSeconds) { |
|||
return Math.min((reconnectIntervalMinSeconds > 0 ? reconnectIntervalMinSeconds : DEFAULT_RECONNECT_INTERVAL_SEC), this.reconnectIntervalMaxSeconds); |
|||
} |
|||
|
|||
@Override |
|||
synchronized public long getNextReconnectDelay() { |
|||
final long currentNanoTime = getNanoTime(); |
|||
final long coolDownSpentNanos = currentNanoTime - lastDisconnectNanoTime; |
|||
lastDisconnectNanoTime = currentNanoTime; |
|||
if (isCooledDown(coolDownSpentNanos)) { |
|||
retryCount = 0; |
|||
return reconnectIntervalMinSeconds; |
|||
} |
|||
return calculateNextReconnectDelay() + calculateJitter(); |
|||
} |
|||
|
|||
long calculateJitter() { |
|||
return ThreadLocalRandom.current().nextInt() >= 0 ? JITTER_MAX : 0; |
|||
} |
|||
|
|||
long calculateNextReconnectDelay() { |
|||
return Math.min(reconnectIntervalMaxSeconds, reconnectIntervalMinSeconds + calculateExp(retryCount++)); |
|||
} |
|||
|
|||
long calculateExp(long e) { |
|||
return 1L << Math.min(e, EXP_MAX); |
|||
} |
|||
|
|||
boolean isCooledDown(long coolDownSpentNanos) { |
|||
return TimeUnit.NANOSECONDS.toSeconds(coolDownSpentNanos) > reconnectIntervalMaxSeconds + reconnectIntervalMinSeconds; |
|||
} |
|||
|
|||
long getNanoTime() { |
|||
return System.nanoTime(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,210 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import io.netty.buffer.ByteBuf; |
|||
import io.netty.buffer.PooledByteBufAllocator; |
|||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; |
|||
import io.netty.handler.codec.mqtt.MqttMessageType; |
|||
import io.netty.handler.codec.mqtt.MqttQoS; |
|||
import io.netty.util.ResourceLeakDetector; |
|||
import io.netty.util.concurrent.Future; |
|||
import io.netty.util.concurrent.Promise; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.awaitility.Awaitility; |
|||
import org.awaitility.core.ConditionTimeoutException; |
|||
import org.junit.jupiter.api.AfterEach; |
|||
import org.junit.jupiter.api.BeforeAll; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.testcontainers.hivemq.HiveMQContainer; |
|||
import org.testcontainers.junit.jupiter.Container; |
|||
import org.testcontainers.junit.jupiter.Testcontainers; |
|||
import org.testcontainers.utility.DockerImageName; |
|||
import org.thingsboard.common.util.AbstractListeningExecutor; |
|||
|
|||
import java.nio.charset.StandardCharsets; |
|||
import java.time.Duration; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
@Slf4j |
|||
@Testcontainers |
|||
class MqttClientTest { |
|||
|
|||
final int randomPort = 0; |
|||
|
|||
@Container |
|||
HiveMQContainer broker = new HiveMQContainer(DockerImageName.parse("hivemq/hivemq-ce").withTag("2025.2")); |
|||
|
|||
MqttTestProxy proxy; |
|||
|
|||
MqttClient client; |
|||
|
|||
AbstractListeningExecutor handlerExecutor; |
|||
|
|||
@BeforeAll |
|||
static void init() { |
|||
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); |
|||
} |
|||
|
|||
@BeforeEach |
|||
void setup() { |
|||
handlerExecutor = new AbstractListeningExecutor() { |
|||
@Override |
|||
protected int getThreadPollSize() { |
|||
return 1; |
|||
} |
|||
}; |
|||
handlerExecutor.init(); |
|||
} |
|||
|
|||
@AfterEach |
|||
void cleanup() { |
|||
if (client != null) { |
|||
client.disconnect(); |
|||
client = null; |
|||
} |
|||
if (proxy != null) { |
|||
proxy.stop(); |
|||
proxy = null; |
|||
} |
|||
handlerExecutor.destroy(); |
|||
handlerExecutor = null; |
|||
} |
|||
|
|||
@Test |
|||
void testConnectToBroker() { |
|||
// GIVEN
|
|||
var clientConfig = new MqttClientConfig(); |
|||
clientConfig.setOwnerId("Test[ConnectToBroker]"); |
|||
clientConfig.setClientId("connect"); |
|||
|
|||
client = MqttClient.create(clientConfig, null, handlerExecutor); |
|||
|
|||
// WHEN
|
|||
Promise<MqttConnectResult> connectFuture = client.connect(broker.getHost(), broker.getMqttPort()); |
|||
|
|||
// THEN
|
|||
assertThat(connectFuture).isNotNull(); |
|||
|
|||
Awaitility.await("waiting for client to connect") |
|||
.atMost(Duration.ofSeconds(10L)) |
|||
.until(connectFuture::isDone); |
|||
|
|||
assertThat(connectFuture.isSuccess()).isTrue(); |
|||
|
|||
MqttConnectResult actualConnectResult = connectFuture.getNow(); |
|||
assertThat(actualConnectResult).isNotNull(); |
|||
assertThat(actualConnectResult.isSuccess()).isTrue(); |
|||
assertThat(actualConnectResult.getReturnCode()).isEqualTo(MqttConnectReturnCode.CONNECTION_ACCEPTED); |
|||
|
|||
assertThat(client.isConnected()).isTrue(); |
|||
} |
|||
|
|||
@Test |
|||
void testDisconnectDueToKeepAliveIfNoActivity() { |
|||
// GIVEN
|
|||
proxy = MqttTestProxy.builder() |
|||
.localPort(randomPort) |
|||
.brokerHost(broker.getHost()) |
|||
.brokerPort(broker.getMqttPort()) |
|||
.brokerToClientInterceptor(msg -> msg.fixedHeader().messageType() != MqttMessageType.PINGRESP) // drop all ping responses to simulate broker down
|
|||
.build(); |
|||
|
|||
int idleTimeoutSeconds = 2; |
|||
|
|||
var clientConfig = new MqttClientConfig(); |
|||
clientConfig.setOwnerId("Test[KeepAliveDisconnect]"); |
|||
clientConfig.setClientId("no-activity-disconnect"); |
|||
clientConfig.setTimeoutSeconds(idleTimeoutSeconds); |
|||
clientConfig.setReconnect(false); // disable auto reconnect
|
|||
client = MqttClient.create(clientConfig, null, handlerExecutor); |
|||
|
|||
// WHEN-THEN
|
|||
connect(broker.getHost(), proxy.getPort()); |
|||
|
|||
// no activity...
|
|||
|
|||
Awaitility.await("waiting for client to disconnect") |
|||
.pollDelay(Duration.ofSeconds(idleTimeoutSeconds * 2)) // 2 seconds to wait for the first idle event and then 2 seconds for scheduled disconnect to fire
|
|||
.atMost(Duration.ofSeconds(10)) |
|||
.untilAsserted(() -> assertThat(client.isConnected()).isFalse()); |
|||
} |
|||
|
|||
@Test |
|||
void testRetransmission() { |
|||
// GIVEN
|
|||
proxy = MqttTestProxy.builder() |
|||
.localPort(randomPort) |
|||
.brokerHost(broker.getHost()) |
|||
.brokerPort(broker.getMqttPort()) |
|||
.brokerToClientInterceptor(msg -> msg.fixedHeader().messageType() != MqttMessageType.PUBACK) // drop all pubacks to allow retransmission to happen
|
|||
.build(); |
|||
|
|||
// create client
|
|||
var clientConfig = new MqttClientConfig(); |
|||
clientConfig.setOwnerId("Test[Retransmission]"); |
|||
clientConfig.setClientId("retransmission"); |
|||
clientConfig.setRetransmissionConfig(new MqttClientConfig.RetransmissionConfig(1, 1000L, 0d)); |
|||
client = MqttClient.create(clientConfig, null, handlerExecutor); |
|||
|
|||
// connect to a broker
|
|||
connect(broker.getHost(), proxy.getPort()); |
|||
|
|||
// subscribe to a topic
|
|||
String topic = "test-topic"; |
|||
List<ByteBuf> receivedMessages = Collections.synchronizedList(new ArrayList<>(2)); |
|||
Future<Void> subscribeFuture = client.on(topic, (__, payload) -> { |
|||
receivedMessages.add(payload); |
|||
return Futures.immediateVoidFuture(); |
|||
}); |
|||
Awaitility.await("waiting for client to subscribe to a topic") |
|||
.atMost(Duration.ofSeconds(10L)) |
|||
.until(subscribeFuture::isDone); |
|||
|
|||
// WHEN
|
|||
// publish a message
|
|||
ByteBuf message = PooledByteBufAllocator.DEFAULT.buffer().writeBytes("test message".getBytes(StandardCharsets.UTF_8)); |
|||
client.publish(topic, message, MqttQoS.AT_LEAST_ONCE); |
|||
|
|||
// THEN
|
|||
// wait enough time so that retransmission happens and stops
|
|||
// if retransmission works incorrectly waiting 10 seconds allows for additional retransmissions to happen
|
|||
try { |
|||
Awaitility.await("wait up to 10s, stop early if too many messages") |
|||
.atMost(Duration.ofSeconds(10L)) |
|||
.pollInterval(Duration.ofMillis(100)) |
|||
.until(() -> receivedMessages.size() > 2); |
|||
} catch (ConditionTimeoutException __) { |
|||
// didn't exceed 2 messages
|
|||
} |
|||
|
|||
assertThat(receivedMessages).size().describedAs("incorrect number of messages received, expected 2 (original plus one retransmitted)").isEqualTo(2); |
|||
} |
|||
|
|||
private void connect(String host, int port) { |
|||
Promise<MqttConnectResult> connectFuture = client.connect(host, port); |
|||
Awaitility.await("waiting for client to connect") |
|||
.atMost(Duration.ofSeconds(10L)) |
|||
.until(connectFuture::isSuccess); |
|||
} |
|||
|
|||
} |
|||
@ -1,63 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
import io.netty.channel.Channel; |
|||
import io.netty.channel.ChannelFuture; |
|||
import io.netty.channel.ChannelFutureListener; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.DefaultEventLoop; |
|||
import io.netty.handler.timeout.IdleStateEvent; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
|
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.Mockito.after; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
class MqttPingHandlerTest { |
|||
|
|||
static final int KEEP_ALIVE_SECONDS = 0; |
|||
static final int PROCESS_SEND_DISCONNECT_MSG_TIME_MS = 500; |
|||
|
|||
MqttPingHandler mqttPingHandler; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
mqttPingHandler = new MqttPingHandler(KEEP_ALIVE_SECONDS); |
|||
} |
|||
|
|||
@Test |
|||
void givenChannelReaderIdleState_whenNoPingResponse_thenDisconnectClient() throws Exception { |
|||
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); |
|||
Channel channel = mock(Channel.class); |
|||
when(ctx.channel()).thenReturn(channel); |
|||
when(channel.eventLoop()).thenReturn(new DefaultEventLoop()); |
|||
ChannelFuture channelFuture = mock(ChannelFuture.class); |
|||
when(channel.writeAndFlush(any())).thenReturn(channelFuture); |
|||
|
|||
mqttPingHandler.userEventTriggered(ctx, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT); |
|||
verify( |
|||
channelFuture, |
|||
after(TimeUnit.SECONDS.toMillis(KEEP_ALIVE_SECONDS) + PROCESS_SEND_DISCONNECT_MSG_TIME_MS) |
|||
).addListener(eq(ChannelFutureListener.CLOSE)); |
|||
} |
|||
} |
|||
@ -0,0 +1,202 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
import io.netty.bootstrap.Bootstrap; |
|||
import io.netty.bootstrap.ServerBootstrap; |
|||
import io.netty.channel.Channel; |
|||
import io.netty.channel.ChannelFuture; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.ChannelInitializer; |
|||
import io.netty.channel.EventLoopGroup; |
|||
import io.netty.channel.SimpleChannelInboundHandler; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.channel.socket.SocketChannel; |
|||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
|||
import io.netty.channel.socket.nio.NioSocketChannel; |
|||
import io.netty.handler.codec.mqtt.MqttDecoder; |
|||
import io.netty.handler.codec.mqtt.MqttEncoder; |
|||
import io.netty.handler.codec.mqtt.MqttMessage; |
|||
import io.netty.util.ReferenceCountUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.net.InetSocketAddress; |
|||
import java.util.function.Predicate; |
|||
|
|||
@Slf4j |
|||
public class MqttTestProxy { |
|||
|
|||
private final EventLoopGroup bossGroup; |
|||
private final EventLoopGroup workerGroup; |
|||
|
|||
private Channel clientToProxyChannel; |
|||
private Channel proxyToBrokerChannel; |
|||
|
|||
private final int assignedPort; |
|||
|
|||
private boolean stopped; |
|||
|
|||
private final Predicate<MqttMessage> brokerToClientInterceptor; |
|||
|
|||
private MqttTestProxy(Builder builder) { |
|||
log.info("Starting MQTT proxy..."); |
|||
|
|||
brokerToClientInterceptor = builder.brokerToClientInterceptor != null ? builder.brokerToClientInterceptor : msg -> true; |
|||
bossGroup = new NioEventLoopGroup(1); |
|||
workerGroup = new NioEventLoopGroup(1); |
|||
|
|||
ServerBootstrap proxyBootstrap = new ServerBootstrap(); |
|||
proxyBootstrap.group(bossGroup, workerGroup) |
|||
.channel(NioServerSocketChannel.class) |
|||
.childHandler(new ChannelInitializer<SocketChannel>() { |
|||
@Override |
|||
protected void initChannel(SocketChannel channel) { |
|||
clientToProxyChannel = channel; |
|||
clientToProxyChannel.config().setAutoRead(false); // do not accept data before we connected to a broker
|
|||
|
|||
connectToBroker(builder.brokerHost, builder.brokerPort).addListener(future -> { |
|||
if (future.isSuccess()) { |
|||
clientToProxyChannel.pipeline().addLast("mqttDecoder", new MqttDecoder()); |
|||
clientToProxyChannel.pipeline().addLast("mqttToBroker", new MqttRelayHandler(proxyToBrokerChannel, null)); |
|||
clientToProxyChannel.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE); |
|||
|
|||
clientToProxyChannel.config().setAutoRead(true); // start accepting data for a client
|
|||
} else { |
|||
log.error("Failed to connect to broker", future.cause()); |
|||
clientToProxyChannel.close(); |
|||
} |
|||
}); |
|||
} |
|||
}); |
|||
|
|||
try { |
|||
Channel proxyChannel = proxyBootstrap.bind(builder.localPort).sync().channel(); |
|||
assignedPort = ((InetSocketAddress) proxyChannel.localAddress()).getPort(); |
|||
} catch (Exception e) { |
|||
log.error("Failed to start MQTT proxy", e); |
|||
throw new RuntimeException("Failed to start MQTT proxy", e); |
|||
} |
|||
|
|||
log.info("MQTT proxy started on port {}", assignedPort); |
|||
} |
|||
|
|||
private ChannelFuture connectToBroker(String brokerHost, int brokerPort) { |
|||
Bootstrap proxyToBrokerBootstrap = new Bootstrap(); |
|||
proxyToBrokerBootstrap.group(workerGroup) |
|||
.channel(NioSocketChannel.class) |
|||
.handler(new ChannelInitializer<SocketChannel>() { |
|||
@Override |
|||
protected void initChannel(SocketChannel channel) { |
|||
proxyToBrokerChannel = channel; |
|||
proxyToBrokerChannel.pipeline().addLast(new MqttDecoder()); |
|||
proxyToBrokerChannel.pipeline().addLast("mqttToClient", new MqttRelayHandler(clientToProxyChannel, brokerToClientInterceptor)); |
|||
proxyToBrokerChannel.pipeline().addLast(MqttEncoder.INSTANCE); |
|||
} |
|||
}); |
|||
return proxyToBrokerBootstrap.connect(brokerHost, brokerPort); |
|||
} |
|||
|
|||
private static class MqttRelayHandler extends SimpleChannelInboundHandler<MqttMessage> { |
|||
|
|||
private final Channel targetChannel; |
|||
private final Predicate<MqttMessage> interceptor; |
|||
|
|||
private MqttRelayHandler(Channel targetChannel, Predicate<MqttMessage> interceptor) { |
|||
this.targetChannel = targetChannel; |
|||
this.interceptor = interceptor; |
|||
} |
|||
|
|||
@Override |
|||
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) { |
|||
log.debug("Received message: {}", msg.fixedHeader().messageType()); |
|||
if (interceptor == null || interceptor.test(msg)) { |
|||
if (targetChannel.isActive()) { |
|||
targetChannel.writeAndFlush(ReferenceCountUtil.retain(msg)); |
|||
} |
|||
} else { |
|||
log.info("Dropping message: {}", msg.fixedHeader().messageType()); |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
public void stop() { |
|||
if (stopped) { |
|||
log.info("MQTT proxy was already stopped"); |
|||
return; |
|||
} |
|||
|
|||
stopped = true; |
|||
|
|||
log.info("Stopping MQTT proxy..."); |
|||
|
|||
if (clientToProxyChannel != null) { |
|||
clientToProxyChannel.close(); |
|||
} |
|||
if (proxyToBrokerChannel != null) { |
|||
proxyToBrokerChannel.close(); |
|||
} |
|||
if (bossGroup != null) { |
|||
bossGroup.shutdownGracefully(); |
|||
} |
|||
if (workerGroup != null) { |
|||
workerGroup.shutdownGracefully(); |
|||
} |
|||
|
|||
log.info("MQTT proxy stopped"); |
|||
} |
|||
|
|||
public int getPort() { |
|||
return assignedPort; |
|||
} |
|||
|
|||
public static Builder builder() { |
|||
return new Builder(); |
|||
} |
|||
|
|||
public static class Builder { |
|||
|
|||
private int localPort; |
|||
private String brokerHost; |
|||
private int brokerPort; |
|||
private Predicate<MqttMessage> brokerToClientInterceptor; |
|||
|
|||
public Builder localPort(int localPort) { |
|||
this.localPort = localPort; |
|||
return this; |
|||
} |
|||
|
|||
public Builder brokerHost(String brokerHost) { |
|||
this.brokerHost = brokerHost; |
|||
return this; |
|||
} |
|||
|
|||
public Builder brokerPort(int brokerPort) { |
|||
this.brokerPort = brokerPort; |
|||
return this; |
|||
} |
|||
|
|||
public Builder brokerToClientInterceptor(Predicate<MqttMessage> interceptor) { |
|||
this.brokerToClientInterceptor = interceptor; |
|||
return this; |
|||
} |
|||
|
|||
public MqttTestProxy build() { |
|||
return new MqttTestProxy(this); |
|||
} |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,95 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.parallel.Execution; |
|||
import org.junit.jupiter.api.parallel.ExecutionMode; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.ValueSource; |
|||
import org.mockito.Mockito; |
|||
import org.mockito.stubbing.Answer; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collection; |
|||
import java.util.concurrent.BlockingQueue; |
|||
import java.util.concurrent.LinkedBlockingDeque; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicLong; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.assertj.core.data.Offset.offset; |
|||
import static org.mockito.ArgumentMatchers.anyLong; |
|||
import static org.mockito.BDDMockito.willAnswer; |
|||
import static org.thingsboard.mqtt.ReconnectStrategyExponential.EXP_MAX; |
|||
import static org.thingsboard.mqtt.ReconnectStrategyExponential.JITTER_MAX; |
|||
|
|||
@Slf4j |
|||
class ReconnectStrategyExponentialTest { |
|||
|
|||
@Execution(ExecutionMode.SAME_THREAD) // just for convenient log reading
|
|||
@ParameterizedTest |
|||
@ValueSource(ints = {1, 0, 60}) |
|||
public void exponentialReconnectDelayTest(final int reconnectIntervalMinSeconds) { |
|||
final ReconnectStrategyExponential strategy = Mockito.spy(new ReconnectStrategyExponential(reconnectIntervalMinSeconds)); |
|||
log.info("=== Reconnect delay test for ReconnectStrategyExponential({}) : calculated min [{}] max [{}] ===", reconnectIntervalMinSeconds, strategy.getReconnectIntervalMinSeconds(), strategy.getReconnectIntervalMaxSeconds()); |
|||
final AtomicLong nanoTime = new AtomicLong(System.nanoTime()); |
|||
willAnswer((x) -> nanoTime.get()).given(strategy).getNanoTime(); |
|||
final LinkedBlockingDeque<Long> jittersCaptured = new LinkedBlockingDeque<>(); |
|||
final LinkedBlockingDeque<Long> expCaptured = new LinkedBlockingDeque<>(); |
|||
|
|||
willAnswer(captureResult(jittersCaptured)).given(strategy).calculateJitter(); |
|||
willAnswer(captureResult(expCaptured)).given(strategy).calculateExp(anyLong()); |
|||
|
|||
for (int phase = 0; phase < 3; phase++) { |
|||
log.info("== Phase {} ==", phase); |
|||
long previousDelay = 0; |
|||
for (int i = 0; i < EXP_MAX + 4; i++) { |
|||
final long nextReconnectDelay = strategy.getNextReconnectDelay(); |
|||
nanoTime.addAndGet(TimeUnit.SECONDS.toNanos(nextReconnectDelay)); |
|||
log.info("Retry [{}] Delay [{}] : min [{}] exp [{}] jitter [{}]", strategy.getRetryCount(), nextReconnectDelay, strategy.getReconnectIntervalMinSeconds(), expCaptured.peekLast(), jittersCaptured.peekLast()); |
|||
assertThat(previousDelay).satisfiesAnyOf( |
|||
v -> assertThat(v).isLessThanOrEqualTo(nextReconnectDelay), |
|||
v -> assertThat(v).isCloseTo(nextReconnectDelay, offset(JITTER_MAX)) // Adjust tolerance as needed
|
|||
); |
|||
previousDelay = nextReconnectDelay; |
|||
} |
|||
log.info("Jitters captured: {}", drainAll(jittersCaptured)); |
|||
log.info("Exponents captured: {}", drainAll(expCaptured)); |
|||
assertThat(previousDelay).isCloseTo(strategy.getReconnectIntervalMaxSeconds(), offset(JITTER_MAX)); |
|||
|
|||
final long coolDownPeriodSec = strategy.getReconnectIntervalMinSeconds() + strategy.getReconnectIntervalMaxSeconds() + 1; |
|||
log.info("Cooling down for [{}] seconds ...", coolDownPeriodSec); |
|||
nanoTime.addAndGet(TimeUnit.SECONDS.toNanos(coolDownPeriodSec)); |
|||
assertThat(strategy.isCooledDown(TimeUnit.SECONDS.toNanos(coolDownPeriodSec))).as("cooled down").isTrue(); |
|||
} |
|||
} |
|||
|
|||
private Answer<Long> captureResult(Collection<Long> collection) { |
|||
return invocation -> { |
|||
long result = (long) invocation.callRealMethod(); |
|||
collection.add(result); |
|||
return result; |
|||
}; |
|||
} |
|||
|
|||
private Collection<Long> drainAll(BlockingQueue<Long> jittersCaptured) { |
|||
Collection<Long> elements = new ArrayList<>(); |
|||
jittersCaptured.drainTo(elements); |
|||
return elements; |
|||
} |
|||
|
|||
} |
|||
@ -1,151 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt.integration; |
|||
|
|||
import io.netty.buffer.Unpooled; |
|||
import io.netty.channel.EventLoopGroup; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.handler.codec.mqtt.MqttMessageType; |
|||
import io.netty.handler.codec.mqtt.MqttQoS; |
|||
import io.netty.util.concurrent.Future; |
|||
import io.netty.util.concurrent.Promise; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.AfterEach; |
|||
import org.junit.jupiter.api.Assertions; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.parallel.ResourceLock; |
|||
import org.thingsboard.common.util.AbstractListeningExecutor; |
|||
import org.thingsboard.mqtt.MqttClient; |
|||
import org.thingsboard.mqtt.MqttClientConfig; |
|||
import org.thingsboard.mqtt.MqttConnectResult; |
|||
import org.thingsboard.mqtt.integration.server.MqttServer; |
|||
|
|||
import java.nio.charset.StandardCharsets; |
|||
import java.util.List; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.TimeoutException; |
|||
|
|||
@ResourceLock("port8885") // test MQTT server port
|
|||
@Slf4j |
|||
public class MqttIntegrationTest { |
|||
|
|||
static final String MQTT_HOST = "localhost"; |
|||
static final int KEEPALIVE_TIMEOUT_SECONDS = 2; |
|||
static final long RECONNECT_DELAY_SECONDS = 10L; |
|||
|
|||
EventLoopGroup eventLoopGroup; |
|||
MqttServer mqttServer; |
|||
|
|||
MqttClient mqttClient; |
|||
|
|||
AbstractListeningExecutor handlerExecutor; |
|||
|
|||
@BeforeEach |
|||
public void init() throws Exception { |
|||
this.handlerExecutor = new AbstractListeningExecutor() { |
|||
@Override |
|||
protected int getThreadPollSize() { |
|||
return 4; |
|||
} |
|||
}; |
|||
handlerExecutor.init(); |
|||
|
|||
this.eventLoopGroup = new NioEventLoopGroup(); |
|||
|
|||
this.mqttServer = new MqttServer(); |
|||
this.mqttServer.init(); |
|||
} |
|||
|
|||
@AfterEach |
|||
public void destroy() throws InterruptedException { |
|||
if (this.mqttClient != null) { |
|||
this.mqttClient.disconnect(); |
|||
} |
|||
if (this.mqttServer != null) { |
|||
this.mqttServer.shutdown(); |
|||
} |
|||
if (this.eventLoopGroup != null) { |
|||
this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); |
|||
} |
|||
if (this.handlerExecutor != null) { |
|||
this.handlerExecutor.destroy(); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void givenActiveMqttClient_whenNoActivityForKeepAliveTimeout_thenDisconnectClient() throws Throwable { |
|||
//given
|
|||
this.mqttClient = initClient(); |
|||
|
|||
log.warn("Sending publish messages..."); |
|||
CountDownLatch latch = new CountDownLatch(3); |
|||
for (int i = 0; i < 3; i++) { |
|||
Thread.sleep(30); |
|||
Future<Void> pubFuture = publishMsg(); |
|||
pubFuture.addListener(future -> latch.countDown()); |
|||
} |
|||
|
|||
log.warn("Waiting for messages acknowledgments..."); |
|||
boolean awaitResult = latch.await(10, TimeUnit.SECONDS); |
|||
Assertions.assertTrue(awaitResult); |
|||
log.warn("Messages are delivered successfully..."); |
|||
|
|||
//when
|
|||
log.warn("Starting idle period..."); |
|||
Thread.sleep(5000); |
|||
|
|||
//then
|
|||
List<MqttMessageType> allReceivedEvents = this.mqttServer.getEventsFromClient(); |
|||
long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count(); |
|||
|
|||
Assertions.assertEquals(1, disconnectCount); |
|||
} |
|||
|
|||
private Future<Void> publishMsg() { |
|||
return this.mqttClient.publish( |
|||
"test/topic", |
|||
Unpooled.wrappedBuffer("payload".getBytes(StandardCharsets.UTF_8)), |
|||
MqttQoS.AT_MOST_ONCE); |
|||
} |
|||
|
|||
private MqttClient initClient() throws Exception { |
|||
MqttClientConfig config = new MqttClientConfig(); |
|||
config.setOwnerId("MqttIntegrationTest"); |
|||
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS); |
|||
config.setReconnectDelay(RECONNECT_DELAY_SECONDS); |
|||
MqttClient client = MqttClient.create(config, null, handlerExecutor); |
|||
client.setEventLoop(this.eventLoopGroup); |
|||
Promise<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort()); |
|||
|
|||
String hostPort = MQTT_HOST + ":" + this.mqttServer.getMqttPort(); |
|||
MqttConnectResult result; |
|||
try { |
|||
result = connectFuture.get(10, TimeUnit.SECONDS); |
|||
} catch (TimeoutException ex) { |
|||
connectFuture.cancel(true); |
|||
client.disconnect(); |
|||
throw new RuntimeException(String.format("Failed to connect to MQTT server at %s.", hostPort)); |
|||
} |
|||
if (!result.isSuccess()) { |
|||
connectFuture.cancel(true); |
|||
client.disconnect(); |
|||
throw new RuntimeException(String.format("Failed to connect to MQTT server at %s. Result code is: %s", hostPort, result.getReturnCode())); |
|||
} |
|||
return client; |
|||
} |
|||
} |
|||
@ -1,84 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt.integration.server; |
|||
|
|||
import io.netty.bootstrap.ServerBootstrap; |
|||
import io.netty.channel.Channel; |
|||
import io.netty.channel.ChannelInitializer; |
|||
import io.netty.channel.ChannelOption; |
|||
import io.netty.channel.ChannelPipeline; |
|||
import io.netty.channel.EventLoopGroup; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.channel.socket.SocketChannel; |
|||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
|||
import io.netty.handler.codec.mqtt.MqttDecoder; |
|||
import io.netty.handler.codec.mqtt.MqttEncoder; |
|||
import io.netty.handler.codec.mqtt.MqttMessageType; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.CopyOnWriteArrayList; |
|||
|
|||
@Slf4j |
|||
public class MqttServer { |
|||
|
|||
@Getter |
|||
private final List<MqttMessageType> eventsFromClient = new CopyOnWriteArrayList<>(); |
|||
@Getter |
|||
private final int mqttPort = 8885; |
|||
|
|||
private Channel serverChannel; |
|||
private EventLoopGroup bossGroup; |
|||
private EventLoopGroup workerGroup; |
|||
|
|||
public void init() throws Exception { |
|||
log.info("Starting MQTT server on port {}...", mqttPort); |
|||
bossGroup = new NioEventLoopGroup(); |
|||
workerGroup = new NioEventLoopGroup(); |
|||
ServerBootstrap b = new ServerBootstrap(); |
|||
b.group(bossGroup, workerGroup) |
|||
.channel(NioServerSocketChannel.class) |
|||
.childHandler(new ChannelInitializer<SocketChannel>() { |
|||
@Override |
|||
protected void initChannel(SocketChannel ch) throws Exception { |
|||
ChannelPipeline pipeline = ch.pipeline(); |
|||
pipeline.addLast("decoder", new MqttDecoder(65536)); |
|||
pipeline.addLast("encoder", MqttEncoder.INSTANCE); |
|||
|
|||
MqttTransportHandler handler = new MqttTransportHandler(eventsFromClient); |
|||
|
|||
pipeline.addLast(handler); |
|||
ch.closeFuture().addListener(handler); |
|||
} |
|||
}) |
|||
.childOption(ChannelOption.SO_KEEPALIVE, true); |
|||
|
|||
serverChannel = b.bind(mqttPort).sync().channel(); |
|||
log.info("Mqtt transport started!"); |
|||
} |
|||
|
|||
public void shutdown() throws InterruptedException { |
|||
log.info("Stopping MQTT transport!"); |
|||
try { |
|||
serverChannel.close().sync(); |
|||
} finally { |
|||
workerGroup.shutdownGracefully(); |
|||
bossGroup.shutdownGracefully(); |
|||
} |
|||
log.info("MQTT transport stopped!"); |
|||
} |
|||
} |
|||
@ -1,141 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.mqtt.integration.server; |
|||
|
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.ChannelInboundHandlerAdapter; |
|||
import io.netty.handler.codec.mqtt.MqttConnAckMessage; |
|||
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; |
|||
import io.netty.handler.codec.mqtt.MqttConnectMessage; |
|||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode; |
|||
import io.netty.handler.codec.mqtt.MqttFixedHeader; |
|||
import io.netty.handler.codec.mqtt.MqttMessage; |
|||
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; |
|||
import io.netty.handler.codec.mqtt.MqttMessageType; |
|||
import io.netty.handler.codec.mqtt.MqttPubAckMessage; |
|||
import io.netty.handler.codec.mqtt.MqttPublishMessage; |
|||
import io.netty.util.ReferenceCountUtil; |
|||
import io.netty.util.concurrent.Future; |
|||
import io.netty.util.concurrent.GenericFutureListener; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
|
|||
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK; |
|||
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNECT; |
|||
import static io.netty.handler.codec.mqtt.MqttMessageType.DISCONNECT; |
|||
import static io.netty.handler.codec.mqtt.MqttMessageType.PINGREQ; |
|||
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; |
|||
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBLISH; |
|||
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; |
|||
|
|||
@Slf4j |
|||
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> { |
|||
|
|||
private final List<MqttMessageType> eventsFromClient; |
|||
private final UUID sessionId; |
|||
|
|||
MqttTransportHandler(List<MqttMessageType> eventsFromClient) { |
|||
this.sessionId = UUID.randomUUID(); |
|||
this.eventsFromClient = eventsFromClient; |
|||
} |
|||
|
|||
@Override |
|||
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
|||
log.trace("[{}] Processing msg: {}", sessionId, msg); |
|||
try { |
|||
if (msg instanceof MqttMessage) { |
|||
MqttMessage message = (MqttMessage) msg; |
|||
if (message.decoderResult().isSuccess()) { |
|||
processMqttMsg(ctx, message); |
|||
} else { |
|||
log.error("[{}] Message decoding failed: {}", sessionId, message.decoderResult().cause().getMessage()); |
|||
ctx.close(); |
|||
} |
|||
} else { |
|||
log.debug("[{}] Received non mqtt message: {}", sessionId, msg.getClass().getSimpleName()); |
|||
ctx.close(); |
|||
} |
|||
} finally { |
|||
ReferenceCountUtil.safeRelease(msg); |
|||
} |
|||
} |
|||
|
|||
void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { |
|||
if (msg.fixedHeader() == null) { |
|||
ctx.close(); |
|||
return; |
|||
} |
|||
switch (msg.fixedHeader().messageType()) { |
|||
case CONNECT: |
|||
eventsFromClient.add(CONNECT); |
|||
processConnect(ctx, (MqttConnectMessage) msg); |
|||
break; |
|||
case DISCONNECT: |
|||
eventsFromClient.add(DISCONNECT); |
|||
ctx.close(); |
|||
break; |
|||
case PUBLISH: |
|||
// QoS 0 and 1 supported only here
|
|||
eventsFromClient.add(PUBLISH); |
|||
MqttPublishMessage mqttPubMsg = (MqttPublishMessage) msg; |
|||
ack(ctx, mqttPubMsg.variableHeader().packetId()); |
|||
break; |
|||
case PINGREQ: |
|||
// We will not handle PINGREQ and will not send any PINGRESP to simulate the MQTT server is down
|
|||
eventsFromClient.add(PINGREQ); |
|||
break; |
|||
default: |
|||
break; |
|||
} |
|||
} |
|||
|
|||
void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { |
|||
String userName = msg.payload().userName(); |
|||
String clientId = msg.payload().clientIdentifier(); |
|||
|
|||
log.warn("[{}][{}] Processing connect msg for client: {}!", sessionId, userName, clientId); |
|||
ctx.writeAndFlush(createMqttConnAckMsg(msg)); |
|||
} |
|||
|
|||
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectMessage msg) { |
|||
MqttFixedHeader mqttFixedHeader = |
|||
new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0); |
|||
MqttConnAckVariableHeader mqttConnAckVariableHeader = |
|||
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !msg.variableHeader().isCleanSession()); |
|||
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); |
|||
} |
|||
|
|||
private void ack(ChannelHandlerContext ctx, int msgId) { |
|||
if (msgId > 0) { |
|||
ctx.writeAndFlush(createMqttPubAckMsg(msgId)); |
|||
} |
|||
} |
|||
|
|||
public static MqttPubAckMessage createMqttPubAckMsg(int requestId) { |
|||
MqttFixedHeader mqttFixedHeader = |
|||
new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 0); |
|||
MqttMessageIdVariableHeader mqttMsgIdVariableHeader = |
|||
MqttMessageIdVariableHeader.from(requestId); |
|||
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader); |
|||
} |
|||
|
|||
@Override |
|||
public void operationComplete(Future<? super Void> future) { |
|||
log.trace("[{}] Channel closed!", sessionId); |
|||
} |
|||
} |
|||
@ -1,3 +0,0 @@ |
|||
junit.jupiter.execution.parallel.enabled = true |
|||
junit.jupiter.execution.parallel.mode.default = concurrent |
|||
junit.jupiter.execution.parallel.mode.classes.default = concurrent |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.api; |
|||
|
|||
public interface MqttClientSettings { |
|||
|
|||
int getRetransmissionMaxAttempts(); |
|||
|
|||
long getRetransmissionInitialDelayMillis(); |
|||
|
|||
double getRetransmissionJitterFactor(); |
|||
|
|||
} |
|||