491 changed files with 17768 additions and 6079 deletions
File diff suppressed because one or more lines are too long
@ -0,0 +1,78 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.shared; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.TbActor; |
|||
import org.thingsboard.server.actors.TbActorId; |
|||
import org.thingsboard.server.actors.TbStringActorId; |
|||
import org.thingsboard.server.actors.service.ContextAwareActor; |
|||
import org.thingsboard.server.actors.service.ContextBasedCreator; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.msg.TbActorMsg; |
|||
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; |
|||
import org.thingsboard.server.common.msg.queue.RuleEngineException; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Slf4j |
|||
public class RuleChainErrorActor extends ContextAwareActor { |
|||
|
|||
private final TenantId tenantId; |
|||
private final RuleEngineException error; |
|||
|
|||
private RuleChainErrorActor(ActorSystemContext systemContext, TenantId tenantId, RuleEngineException error) { |
|||
super(systemContext); |
|||
this.tenantId = tenantId; |
|||
this.error = error; |
|||
} |
|||
|
|||
@Override |
|||
protected boolean doProcess(TbActorMsg msg) { |
|||
if (msg instanceof RuleChainAwareMsg) { |
|||
log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg); |
|||
var rcMsg = (RuleChainAwareMsg) msg; |
|||
rcMsg.getMsg().getCallback().onFailure(error); |
|||
return true; |
|||
} else { |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
public static class ActorCreator extends ContextBasedCreator { |
|||
|
|||
private final TenantId tenantId; |
|||
private final RuleEngineException error; |
|||
|
|||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) { |
|||
super(context); |
|||
this.tenantId = tenantId; |
|||
this.error = error; |
|||
} |
|||
|
|||
@Override |
|||
public TbActorId createActorId() { |
|||
return new TbStringActorId(UUID.randomUUID().toString()); |
|||
} |
|||
|
|||
@Override |
|||
public TbActor createActor() { |
|||
return new RuleChainErrorActor(context, tenantId, error); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,107 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import io.swagger.annotations.ApiOperation; |
|||
import io.swagger.annotations.ApiParam; |
|||
import io.swagger.annotations.ApiResponse; |
|||
import io.swagger.annotations.ApiResponses; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.http.HttpHeaders; |
|||
import org.springframework.http.MediaType; |
|||
import org.springframework.http.ResponseEntity; |
|||
import org.springframework.security.access.prepost.PreAuthorize; |
|||
import org.springframework.web.bind.annotation.PathVariable; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RequestMethod; |
|||
import org.springframework.web.bind.annotation.ResponseBody; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.dao.device.DeviceConnectivityService; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.security.permission.Operation; |
|||
import org.thingsboard.server.service.security.system.SystemSecurityService; |
|||
|
|||
import javax.servlet.http.HttpServletRequest; |
|||
import java.io.IOException; |
|||
import java.net.URISyntaxException; |
|||
|
|||
import static org.thingsboard.server.controller.ControllerConstants.DEVICE_ID; |
|||
import static org.thingsboard.server.controller.ControllerConstants.DEVICE_ID_PARAM_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.PROTOCOL; |
|||
import static org.thingsboard.server.controller.ControllerConstants.PROTOCOL_PARAM_DESCRIPTION; |
|||
import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.PEM_CERT_FILE_NAME; |
|||
|
|||
@RestController |
|||
@TbCoreComponent |
|||
@RequestMapping("/api") |
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class DeviceConnectivityController extends BaseController { |
|||
|
|||
private final DeviceConnectivityService deviceConnectivityService; |
|||
private final SystemSecurityService systemSecurityService; |
|||
|
|||
@ApiOperation(value = "Get commands to publish device telemetry (getDevicePublishTelemetryCommands)", |
|||
notes = "Fetch the list of commands to publish device telemetry based on device profile " + |
|||
"If the user has the authority of 'Tenant Administrator', the server checks that the device is owned by the same tenant. " + |
|||
"If the user has the authority of 'Customer User', the server checks that the device is assigned to the same customer. " + |
|||
TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) |
|||
@ApiResponses(value = { |
|||
@ApiResponse(code = 200, message = "OK", |
|||
examples = @io.swagger.annotations.Example( |
|||
value = { |
|||
@io.swagger.annotations.ExampleProperty( |
|||
mediaType = "application/json", |
|||
value = "{\"http\":\"curl -v -X POST http://localhost:8080/api/v1/0ySs4FTOn5WU15XLmal8/telemetry --header Content-Type:application/json --data {temperature:25}\"," + |
|||
"\"mqtt\":\"mosquitto_pub -d -q 1 -h localhost -t v1/devices/me/telemetry -i myClient1 -u myUsername1 -P myPassword -m {temperature:25}\"," + |
|||
"\"coap\":\"coap-client -m POST coap://localhost:5683/api/v1/0ySs4FTOn5WU15XLmal8/telemetry -t json -e {temperature:25}\"}")}))}) |
|||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") |
|||
@RequestMapping(value = "/device-connectivity/{deviceId}", method = RequestMethod.GET) |
|||
@ResponseBody |
|||
public JsonNode getDevicePublishTelemetryCommands(@ApiParam(value = DEVICE_ID_PARAM_DESCRIPTION) |
|||
@PathVariable(DEVICE_ID) String strDeviceId, HttpServletRequest request) throws ThingsboardException, URISyntaxException { |
|||
checkParameter(DEVICE_ID, strDeviceId); |
|||
DeviceId deviceId = new DeviceId(toUUID(strDeviceId)); |
|||
Device device = checkDeviceId(deviceId, Operation.READ_CREDENTIALS); |
|||
|
|||
String baseUrl = systemSecurityService.getBaseUrl(getTenantId(), getCurrentUser().getCustomerId(), request); |
|||
return deviceConnectivityService.findDevicePublishTelemetryCommands(baseUrl, device); |
|||
} |
|||
|
|||
@ApiOperation(value = "Download server certificate using file path defined in device.connectivity properties (downloadServerCertificate)", notes = "Download server certificate.") |
|||
@RequestMapping(value = "/device-connectivity/{protocol}/certificate/download", method = RequestMethod.GET) |
|||
@ResponseBody |
|||
public ResponseEntity<org.springframework.core.io.Resource> downloadServerCertificate(@ApiParam(value = PROTOCOL_PARAM_DESCRIPTION) |
|||
@PathVariable(PROTOCOL) String protocol) throws ThingsboardException, IOException { |
|||
checkParameter(PROTOCOL, protocol); |
|||
var pemCert = |
|||
checkNotNull(deviceConnectivityService.getPemCertFile(protocol), protocol + " pem cert file is not found!"); |
|||
|
|||
return ResponseEntity.ok() |
|||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + PEM_CERT_FILE_NAME) |
|||
.header("x-filename", PEM_CERT_FILE_NAME) |
|||
.contentLength(pemCert.contentLength()) |
|||
.contentType(MediaType.APPLICATION_OCTET_STREAM) |
|||
.body(pemCert); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,340 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.google.common.util.concurrent.ListeningExecutorService; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.mockito.AdditionalAnswers; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Primary; |
|||
import org.springframework.test.context.ContextConfiguration; |
|||
import org.springframework.test.context.TestPropertySource; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.common.util.ThingsBoardExecutors; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.DeviceProfile; |
|||
import org.thingsboard.server.common.data.DeviceProfileType; |
|||
import org.thingsboard.server.common.data.DeviceTransportType; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; |
|||
import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.DeviceProfileData; |
|||
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; |
|||
import org.thingsboard.server.common.data.id.DeviceProfileId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentials; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentialsType; |
|||
import org.thingsboard.server.dao.device.DeviceDao; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.COAP; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.COAPS; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.DOCKER; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.HTTP; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.HTTPS; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.MQTT; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.MQTTS; |
|||
|
|||
@TestPropertySource(properties = { |
|||
"device.connectivity.https.enabled=true", |
|||
"device.connectivity.mqtts.enabled=true", |
|||
"device.connectivity.coaps.enabled=true", |
|||
}) |
|||
@ContextConfiguration(classes = {DeviceConnectivityControllerTest.Config.class}) |
|||
@DaoSqlTest |
|||
public class DeviceConnectivityControllerTest extends AbstractControllerTest { |
|||
static final TypeReference<PageData<Device>> PAGE_DATA_DEVICE_TYPE_REF = new TypeReference<>() { |
|||
}; |
|||
|
|||
private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/customTopic"; |
|||
private static final String CHECK_DOCUMENTATION = "Check documentation"; |
|||
|
|||
ListeningExecutorService executor; |
|||
|
|||
private Tenant savedTenant; |
|||
private User tenantAdmin; |
|||
private DeviceProfileId mqttDeviceProfileId; |
|||
private DeviceProfileId coapDeviceProfileId; |
|||
|
|||
static class Config { |
|||
@Bean |
|||
@Primary |
|||
public DeviceDao deviceDao(DeviceDao deviceDao) { |
|||
return Mockito.mock(DeviceDao.class, AdditionalAnswers.delegatesTo(deviceDao)); |
|||
} |
|||
} |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(8, getClass())); |
|||
|
|||
loginSysAdmin(); |
|||
|
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|||
Assert.assertNotNull(savedTenant); |
|||
|
|||
tenantAdmin = new User(); |
|||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin.setTenantId(savedTenant.getId()); |
|||
tenantAdmin.setEmail("tenant2@thingsboard.org"); |
|||
tenantAdmin.setFirstName("Joe"); |
|||
tenantAdmin.setLastName("Downs"); |
|||
|
|||
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); |
|||
|
|||
DeviceProfile mqttProfile = new DeviceProfile(); |
|||
mqttProfile.setName("Mqtt device profile"); |
|||
mqttProfile.setType(DeviceProfileType.DEFAULT); |
|||
mqttProfile.setTransportType(DeviceTransportType.MQTT); |
|||
DeviceProfileData deviceProfileData = new DeviceProfileData(); |
|||
deviceProfileData.setConfiguration(new DefaultDeviceProfileConfiguration()); |
|||
MqttDeviceProfileTransportConfiguration transportConfiguration = new MqttDeviceProfileTransportConfiguration(); |
|||
transportConfiguration.setDeviceTelemetryTopic(DEVICE_TELEMETRY_TOPIC); |
|||
deviceProfileData.setTransportConfiguration(transportConfiguration); |
|||
mqttProfile.setProfileData(deviceProfileData); |
|||
mqttProfile.setDefault(false); |
|||
mqttProfile.setDefaultRuleChainId(null); |
|||
|
|||
mqttDeviceProfileId = doPost("/api/deviceProfile", mqttProfile, DeviceProfile.class).getId(); |
|||
|
|||
DeviceProfile coapProfile = new DeviceProfile(); |
|||
coapProfile.setName("Coap device profile"); |
|||
coapProfile.setType(DeviceProfileType.DEFAULT); |
|||
coapProfile.setTransportType(DeviceTransportType.COAP); |
|||
DeviceProfileData deviceProfileData2 = new DeviceProfileData(); |
|||
deviceProfileData2.setConfiguration(new DefaultDeviceProfileConfiguration()); |
|||
deviceProfileData2.setTransportConfiguration(new CoapDeviceProfileTransportConfiguration()); |
|||
coapProfile.setProfileData(deviceProfileData); |
|||
coapProfile.setDefault(false); |
|||
coapProfile.setDefaultRuleChainId(null); |
|||
|
|||
coapDeviceProfileId = doPost("/api/deviceProfile", coapProfile, DeviceProfile.class).getId(); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws Exception { |
|||
executor.shutdownNow(); |
|||
|
|||
loginSysAdmin(); |
|||
|
|||
doDelete("/api/tenant/" + savedTenant.getId().getId()) |
|||
.andExpect(status().isOk()); |
|||
} |
|||
|
|||
@Test |
|||
public void testFetchPublishTelemetryCommandsForDefaultDevice() throws Exception { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setType("default"); |
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
JsonNode commands = |
|||
doGetTyped("/api/device-connectivity/" + savedDevice.getId().getId(), new TypeReference<>() { |
|||
}); |
|||
|
|||
DeviceCredentials credentials = |
|||
doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|||
|
|||
assertThat(commands).hasSize(3); |
|||
JsonNode httpCommands = commands.get(HTTP); |
|||
assertThat(httpCommands.get(HTTP).asText()).isEqualTo(String.format("curl -v -X POST http://localhost:8080/api/v1/%s/telemetry " + |
|||
"--header Content-Type:application/json --data \"{temperature:25}\"", |
|||
credentials.getCredentialsId())); |
|||
assertThat(httpCommands.get(HTTPS).asText()).isEqualTo(String.format("curl -v -X POST https://localhost:443/api/v1/%s/telemetry " + |
|||
"--header Content-Type:application/json --data \"{temperature:25}\"", |
|||
credentials.getCredentialsId())); |
|||
|
|||
|
|||
JsonNode mqttCommands = commands.get(MQTT); |
|||
assertThat(mqttCommands.get(MQTT).asText()).isEqualTo(String.format("mosquitto_pub -d -q 1 -h localhost -p 1883 -t v1/devices/me/telemetry " + |
|||
"-u %s -m \"{temperature:25}\"", |
|||
credentials.getCredentialsId())); |
|||
assertThat(mqttCommands.get(MQTTS).get(0).asText()).isEqualTo("curl -f -S -o tb-server-chain.pem http://localhost:80/api/device-connectivity/mqtts/certificate/download"); |
|||
assertThat(mqttCommands.get(MQTTS).get(1).asText()).isEqualTo(String.format("mosquitto_pub -d -q 1 --cafile tb-server-chain.pem -h localhost -p 8883 " + |
|||
"-t v1/devices/me/telemetry -u %s -m \"{temperature:25}\"", credentials.getCredentialsId())); |
|||
|
|||
JsonNode dockerMqttCommands = commands.get(MQTT).get(DOCKER); |
|||
assertThat(dockerMqttCommands.get(MQTT).asText()).isEqualTo(String.format("docker run --rm -it thingsboard/mosquitto-clients mosquitto_pub -d -q 1 -h localhost" + |
|||
" -p 1883 -t v1/devices/me/telemetry -u %s -m \"{temperature:25}\"", |
|||
credentials.getCredentialsId())); |
|||
assertThat(dockerMqttCommands.get(MQTTS).asText()).isEqualTo(String.format("docker run --rm -it thingsboard/mosquitto-clients " + |
|||
"/bin/sh -c \"curl -f -S -o tb-server-chain.pem http://localhost:80/api/device-connectivity/mqtts/certificate/download && " + |
|||
"mosquitto_pub -d -q 1 --cafile tb-server-chain.pem -h localhost -p 8883 -t v1/devices/me/telemetry -u %s -m \"{temperature:25}\"\"", |
|||
credentials.getCredentialsId())); |
|||
|
|||
JsonNode linuxCoapCommands = commands.get(COAP); |
|||
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -m POST coap://localhost:5683/api/v1/%s/telemetry " + |
|||
"-t json -e \"{temperature:25}\"", credentials.getCredentialsId())); |
|||
assertThat(linuxCoapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -m POST coaps://localhost:5684/api/v1/%s/telemetry" + |
|||
" -t json -e \"{temperature:25}\"", credentials.getCredentialsId())); |
|||
} |
|||
|
|||
@Test |
|||
public void testFetchPublishTelemetryCommandsForMqttDeviceWithAccessToken() throws Exception { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setDeviceProfileId(mqttDeviceProfileId); |
|||
|
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
DeviceCredentials credentials = |
|||
doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|||
|
|||
JsonNode commands = |
|||
doGetTyped("/api/device-connectivity/" + savedDevice.getId().getId(), new TypeReference<>() { |
|||
}); |
|||
assertThat(commands).hasSize(1); |
|||
|
|||
JsonNode mqttCommands = commands.get(MQTT); |
|||
assertThat(mqttCommands.get(MQTT).asText()).isEqualTo(String.format("mosquitto_pub -d -q 1 -h localhost -p 1883 -t %s " + |
|||
"-u %s -m \"{temperature:25}\"", DEVICE_TELEMETRY_TOPIC, credentials.getCredentialsId())); |
|||
assertThat(mqttCommands.get(MQTTS).get(0).asText()).isEqualTo("curl -f -S -o tb-server-chain.pem http://localhost:80/api/device-connectivity/mqtts/certificate/download"); |
|||
assertThat(mqttCommands.get(MQTTS).get(1).asText()).isEqualTo(String.format("mosquitto_pub -d -q 1 --cafile tb-server-chain.pem -h localhost -p 8883 " + |
|||
"-t %s -u %s -m \"{temperature:25}\"", DEVICE_TELEMETRY_TOPIC, credentials.getCredentialsId())); |
|||
|
|||
JsonNode dockerMqttCommands = commands.get(MQTT).get(DOCKER); |
|||
assertThat(dockerMqttCommands.get(MQTT).asText()).isEqualTo(String.format("docker run --rm -it thingsboard/mosquitto-clients mosquitto_pub -d -q 1 -h localhost" + |
|||
" -p 1883 -t %s -u %s -m \"{temperature:25}\"", |
|||
DEVICE_TELEMETRY_TOPIC, credentials.getCredentialsId())); |
|||
assertThat(dockerMqttCommands.get(MQTTS).asText()).isEqualTo(String.format("docker run --rm -it thingsboard/mosquitto-clients " + |
|||
"/bin/sh -c \"curl -f -S -o tb-server-chain.pem http://localhost:80/api/device-connectivity/mqtts/certificate/download && " + |
|||
"mosquitto_pub -d -q 1 --cafile tb-server-chain.pem -h localhost -p 8883 -t %s -u %s -m \"{temperature:25}\"\"", |
|||
DEVICE_TELEMETRY_TOPIC, credentials.getCredentialsId())); |
|||
} |
|||
|
|||
@Test |
|||
public void testFetchPublishTelemetryCommandsForDeviceWithMqttBasicCreds() throws Exception { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setDeviceProfileId(mqttDeviceProfileId); |
|||
|
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
DeviceCredentials credentials = |
|||
doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|||
credentials.setCredentialsId(null); |
|||
credentials.setCredentialsType(DeviceCredentialsType.MQTT_BASIC); |
|||
BasicMqttCredentials basicMqttCredentials = new BasicMqttCredentials(); |
|||
String clientId = "testClientId"; |
|||
String userName = "testUsername"; |
|||
String password = "testPassword"; |
|||
basicMqttCredentials.setClientId(clientId); |
|||
basicMqttCredentials.setUserName(userName); |
|||
basicMqttCredentials.setPassword(password); |
|||
credentials.setCredentialsValue(JacksonUtil.toString(basicMqttCredentials)); |
|||
doPost("/api/device/credentials", credentials) |
|||
.andExpect(status().isOk()); |
|||
|
|||
JsonNode commands = |
|||
doGetTyped("/api/device-connectivity/" + savedDevice.getId().getId(), new TypeReference<>() { |
|||
}); |
|||
assertThat(commands).hasSize(1); |
|||
|
|||
JsonNode mqttCommands = commands.get(MQTT); |
|||
assertThat(mqttCommands.get(MQTT).asText()).isEqualTo(String.format("mosquitto_pub -d -q 1 -h localhost -p 1883 -t %s " + |
|||
"-i %s -u %s -P %s -m \"{temperature:25}\"", DEVICE_TELEMETRY_TOPIC, clientId, userName, password)); |
|||
assertThat(mqttCommands.get(MQTTS).get(0).asText()).isEqualTo("curl -f -S -o tb-server-chain.pem http://localhost:80/api/device-connectivity/mqtts/certificate/download"); |
|||
assertThat(mqttCommands.get(MQTTS).get(1).asText()).isEqualTo(String.format("mosquitto_pub -d -q 1 --cafile tb-server-chain.pem -h localhost -p 8883 " + |
|||
"-t %s -i %s -u %s -P %s -m \"{temperature:25}\"", DEVICE_TELEMETRY_TOPIC, clientId, userName, password)); |
|||
|
|||
JsonNode dockerMqttCommands = commands.get(MQTT).get(DOCKER); |
|||
assertThat(dockerMqttCommands.get(MQTT).asText()).isEqualTo(String.format("docker run --rm -it thingsboard/mosquitto-clients mosquitto_pub -d -q 1 -h localhost" + |
|||
" -p 1883 -t %s -i %s -u %s -P %s -m \"{temperature:25}\"", |
|||
DEVICE_TELEMETRY_TOPIC, clientId, userName, password)); |
|||
assertThat(dockerMqttCommands.get(MQTTS).asText()).isEqualTo(String.format("docker run --rm -it thingsboard/mosquitto-clients " + |
|||
"/bin/sh -c \"curl -f -S -o tb-server-chain.pem http://localhost:80/api/device-connectivity/mqtts/certificate/download && " + |
|||
"mosquitto_pub -d -q 1 --cafile tb-server-chain.pem -h localhost -p 8883 -t %s -i %s -u %s -P %s -m \"{temperature:25}\"\"", |
|||
DEVICE_TELEMETRY_TOPIC, clientId, userName, password)); |
|||
} |
|||
|
|||
@Test |
|||
public void testFetchPublishTelemetryCommandsForDeviceWithX509Creds() throws Exception { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setDeviceProfileId(mqttDeviceProfileId); |
|||
|
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
DeviceCredentials credentials = |
|||
doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|||
credentials.setCredentialsId(null); |
|||
credentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE); |
|||
credentials.setCredentialsValue("testValue"); |
|||
doPost("/api/device/credentials", credentials) |
|||
.andExpect(status().isOk()); |
|||
|
|||
JsonNode commands = |
|||
doGetTyped("/api/device-connectivity/" + savedDevice.getId().getId(), new TypeReference<>() { |
|||
}); |
|||
assertThat(commands).hasSize(1); |
|||
assertThat(commands.get(MQTT).get(MQTTS).asText()).isEqualTo(CHECK_DOCUMENTATION); |
|||
assertThat(commands.get(MQTT).get(DOCKER)).isNull(); |
|||
} |
|||
|
|||
@Test |
|||
public void testFetchPublishTelemetryCommandsForCoapDevice() throws Exception { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setDeviceProfileId(coapDeviceProfileId); |
|||
|
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
DeviceCredentials credentials = |
|||
doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|||
|
|||
JsonNode commands = |
|||
doGetTyped("/api/device-connectivity/" + savedDevice.getId().getId(), new TypeReference<>() { |
|||
}); |
|||
assertThat(commands).hasSize(1); |
|||
|
|||
JsonNode linuxCommands = commands.get(COAP); |
|||
assertThat(linuxCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -m POST coap://localhost:5683/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", |
|||
credentials.getCredentialsId())); |
|||
assertThat(linuxCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -m POST coaps://localhost:5684/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", |
|||
credentials.getCredentialsId())); |
|||
} |
|||
|
|||
@Test |
|||
public void testFetchPublishTelemetryCommandsForCoapDeviceWithX509Creds() throws Exception { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setDeviceProfileId(coapDeviceProfileId); |
|||
|
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
DeviceCredentials credentials = |
|||
doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|||
credentials.setCredentialsId(null); |
|||
credentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE); |
|||
credentials.setCredentialsValue("testValue"); |
|||
doPost("/api/device/credentials", credentials) |
|||
.andExpect(status().isOk()); |
|||
|
|||
JsonNode commands = |
|||
doGetTyped("/api/device-connectivity/" + savedDevice.getId().getId(), new TypeReference<>() { |
|||
}); |
|||
assertThat(commands).hasSize(1); |
|||
assertThat(commands.get(COAP).get(COAPS).asText()).isEqualTo(CHECK_DOCUMENTATION); |
|||
} |
|||
} |
|||
@ -0,0 +1,160 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.plugin; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.awaitility.Awaitility; |
|||
import org.junit.jupiter.api.AfterEach; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.web.socket.CloseStatus; |
|||
import org.springframework.web.socket.adapter.NativeWebSocketSession; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.service.ws.WebSocketSessionRef; |
|||
|
|||
import javax.websocket.RemoteEndpoint; |
|||
import javax.websocket.SendHandler; |
|||
import javax.websocket.SendResult; |
|||
import javax.websocket.Session; |
|||
import java.io.IOException; |
|||
import java.util.Collection; |
|||
import java.util.List; |
|||
import java.util.concurrent.ConcurrentLinkedQueue; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.IntStream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyString; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.willAnswer; |
|||
import static org.mockito.BDDMockito.willDoNothing; |
|||
import static org.mockito.BDDMockito.willReturn; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.never; |
|||
import static org.mockito.Mockito.spy; |
|||
import static org.mockito.Mockito.times; |
|||
import static org.mockito.Mockito.verify; |
|||
|
|||
@Slf4j |
|||
class TbWebSocketHandlerTest { |
|||
|
|||
TbWebSocketHandler wsHandler; |
|||
NativeWebSocketSession session; |
|||
Session nativeSession; |
|||
RemoteEndpoint.Async asyncRemote; |
|||
WebSocketSessionRef sessionRef; |
|||
int maxMsgQueuePerSession; |
|||
TbWebSocketHandler.SessionMetaData sendHandler; |
|||
ExecutorService executor; |
|||
|
|||
@BeforeEach |
|||
void setUp() throws IOException { |
|||
maxMsgQueuePerSession = 100; |
|||
executor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); |
|||
wsHandler = spy(new TbWebSocketHandler()); |
|||
willDoNothing().given(wsHandler).close(any(), any()); |
|||
session = mock(NativeWebSocketSession.class); |
|||
nativeSession = mock(Session.class); |
|||
willReturn(nativeSession).given(session).getNativeSession(Session.class); |
|||
asyncRemote = mock(RemoteEndpoint.Async.class); |
|||
willReturn(asyncRemote).given(nativeSession).getAsyncRemote(); |
|||
sessionRef = mock(WebSocketSessionRef.class, Mockito.RETURNS_DEEP_STUBS); //prevent NPE on logs
|
|||
sendHandler = spy(wsHandler.new SessionMetaData(session, sessionRef, maxMsgQueuePerSession)); |
|||
} |
|||
|
|||
@AfterEach |
|||
void tearDown() { |
|||
if (executor != null) { |
|||
executor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
void sendHandler_sendMsg_parallel_no_race() throws InterruptedException { |
|||
CountDownLatch finishLatch = new CountDownLatch(maxMsgQueuePerSession * 2); |
|||
AtomicInteger sendersCount = new AtomicInteger(); |
|||
willAnswer(invocation -> { |
|||
assertThat(sendersCount.incrementAndGet()).as("no race").isEqualTo(1); |
|||
String text = invocation.getArgument(0); |
|||
SendHandler onResultHandler = invocation.getArgument(1); |
|||
SendResult sendResult = new SendResult(); |
|||
executor.submit(() -> { |
|||
sendersCount.decrementAndGet(); |
|||
onResultHandler.onResult(sendResult); |
|||
finishLatch.countDown(); |
|||
}); |
|||
return null; |
|||
}).given(asyncRemote).sendText(anyString(), any()); |
|||
|
|||
assertThat(sendHandler.isSending.get()).as("sendHandler not is in sending state").isFalse(); |
|||
//first batch
|
|||
IntStream.range(0, maxMsgQueuePerSession).parallel().forEach(i -> sendHandler.sendMsg("hello " + i)); |
|||
Awaitility.await("first batch processed").atMost(30, TimeUnit.SECONDS).until(() -> finishLatch.getCount() == maxMsgQueuePerSession); |
|||
assertThat(sendHandler.isSending.get()).as("sendHandler not is in sending state").isFalse(); |
|||
//second batch - to test pause between big msg batches
|
|||
IntStream.range(100, 100 + maxMsgQueuePerSession).parallel().forEach(i -> sendHandler.sendMsg("hello " + i)); |
|||
assertThat(finishLatch.await(30, TimeUnit.SECONDS)).as("all callbacks fired").isTrue(); |
|||
|
|||
verify(sendHandler, never()).closeSession(any()); |
|||
verify(sendHandler, times(maxMsgQueuePerSession * 2)).onResult(any()); |
|||
assertThat(sendHandler.isSending.get()).as("sendHandler not is in sending state").isFalse(); |
|||
} |
|||
|
|||
@Test |
|||
void sendHandler_sendMsg_message_order() throws InterruptedException { |
|||
CountDownLatch finishLatch = new CountDownLatch(maxMsgQueuePerSession); |
|||
Collection<String> outputs = new ConcurrentLinkedQueue<>(); |
|||
willAnswer(invocation -> { |
|||
String text = invocation.getArgument(0); |
|||
outputs.add(text); |
|||
SendHandler onResultHandler = invocation.getArgument(1); |
|||
SendResult sendResult = new SendResult(); |
|||
executor.submit(() -> { |
|||
onResultHandler.onResult(sendResult); |
|||
finishLatch.countDown(); |
|||
}); |
|||
return null; |
|||
}).given(asyncRemote).sendText(anyString(), any()); |
|||
|
|||
List<String> inputs = IntStream.range(0, maxMsgQueuePerSession).mapToObj(i -> "msg " + i).collect(Collectors.toList()); |
|||
inputs.forEach(s -> sendHandler.sendMsg(s)); |
|||
|
|||
assertThat(finishLatch.await(30, TimeUnit.SECONDS)).as("all callbacks fired").isTrue(); |
|||
assertThat(outputs).as("inputs exactly the same as outputs").containsExactlyElementsOf(inputs); |
|||
|
|||
verify(sendHandler, never()).closeSession(any()); |
|||
verify(sendHandler, times(maxMsgQueuePerSession)).onResult(any()); |
|||
} |
|||
|
|||
@Test |
|||
void sendHandler_sendMsg_queue_size_exceed() { |
|||
willDoNothing().given(asyncRemote).sendText(anyString(), any()); // send text will never call back, so queue will grow each sendMsg
|
|||
sendHandler.sendMsg("first message to stay in-flight all the time during this test"); |
|||
IntStream.range(0, maxMsgQueuePerSession).parallel().forEach(i -> sendHandler.sendMsg("hello " + i)); |
|||
verify(sendHandler, never()).closeSession(any()); |
|||
sendHandler.sendMsg("excessive message"); |
|||
verify(sendHandler, times(1)).closeSession(eq(new CloseStatus(1008, "Max pending updates limit reached!"))); |
|||
verify(asyncRemote, times(1)).sendText(anyString(), any()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache.resourceInfo; |
|||
|
|||
import lombok.Builder; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.Getter; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.thingsboard.server.common.data.id.TbResourceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
@Getter |
|||
@EqualsAndHashCode |
|||
@RequiredArgsConstructor |
|||
@Builder |
|||
public class ResourceInfoCacheKey implements Serializable { |
|||
|
|||
private final TenantId tenantId; |
|||
private final TbResourceId tbResourceId; |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return tenantId + "_" + tbResourceId; |
|||
} |
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache.resourceInfo; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.cache.CacheManager; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.cache.CaffeineTbTransactionalCache; |
|||
import org.thingsboard.server.common.data.CacheConstants; |
|||
import org.thingsboard.server.common.data.TbResourceInfo; |
|||
|
|||
|
|||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) |
|||
@Service("ResourceInfoCache") |
|||
public class ResourceInfoCaffeineCache extends CaffeineTbTransactionalCache<ResourceInfoCacheKey, TbResourceInfo> { |
|||
|
|||
public ResourceInfoCaffeineCache(CacheManager cacheManager) { |
|||
super(cacheManager, CacheConstants.RESOURCE_INFO_CACHE); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache.resourceInfo; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.id.TbResourceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
@Data |
|||
public class ResourceInfoEvictEvent { |
|||
private final TenantId tenantId; |
|||
private final TbResourceId resourceId; |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.cache.resourceInfo; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.cache.CacheSpecsMap; |
|||
import org.thingsboard.server.cache.RedisTbTransactionalCache; |
|||
import org.thingsboard.server.cache.TBRedisCacheConfiguration; |
|||
import org.thingsboard.server.cache.TbFSTRedisSerializer; |
|||
import org.thingsboard.server.common.data.CacheConstants; |
|||
import org.thingsboard.server.common.data.TbResourceInfo; |
|||
|
|||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") |
|||
@Service("ResourceInfoCache") |
|||
public class ResourceInfoRedisCache extends RedisTbTransactionalCache<ResourceInfoCacheKey, TbResourceInfo> { |
|||
|
|||
public ResourceInfoRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { |
|||
super(CacheConstants.RESOURCE_INFO_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbFSTRedisSerializer<>()); |
|||
} |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.device; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import org.springframework.core.io.Resource; |
|||
import org.thingsboard.server.common.data.Device; |
|||
|
|||
import java.net.URISyntaxException; |
|||
|
|||
public interface DeviceConnectivityService { |
|||
|
|||
JsonNode findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException; |
|||
|
|||
Resource getPemCertFile(String protocol); |
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.msg; |
|||
|
|||
public interface TbActorError { |
|||
|
|||
boolean isUnrecoverable(); |
|||
|
|||
} |
|||
@ -0,0 +1,189 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.discovery; |
|||
|
|||
import org.apache.curator.framework.CuratorFramework; |
|||
import org.apache.curator.framework.imps.CuratorFrameworkState; |
|||
import org.apache.curator.framework.recipes.cache.ChildData; |
|||
import org.apache.curator.framework.recipes.cache.PathChildrenCache; |
|||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.mockito.Mock; |
|||
import org.mockito.Mockito; |
|||
import org.mockito.junit.MockitoJUnitRunner; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
|
|||
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED; |
|||
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED; |
|||
import static org.junit.Assert.assertEquals; |
|||
import static org.junit.Assert.assertTrue; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.Mockito.never; |
|||
import static org.mockito.Mockito.reset; |
|||
import static org.mockito.Mockito.times; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
@RunWith(MockitoJUnitRunner.class) |
|||
public class ZkDiscoveryServiceTest { |
|||
|
|||
@Mock |
|||
private TbServiceInfoProvider serviceInfoProvider; |
|||
|
|||
@Mock |
|||
private PartitionService partitionService; |
|||
|
|||
@Mock |
|||
private CuratorFramework client; |
|||
|
|||
@Mock |
|||
private PathChildrenCache cache; |
|||
|
|||
@Mock |
|||
private CuratorFramework curatorFramework; |
|||
|
|||
private ZkDiscoveryService zkDiscoveryService; |
|||
|
|||
private static final long RECALCULATE_DELAY = 100L; |
|||
|
|||
final TransportProtos.ServiceInfo currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-0").build(); |
|||
final ChildData currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); |
|||
final TransportProtos.ServiceInfo childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-1").build(); |
|||
final ChildData childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); |
|||
|
|||
@Before |
|||
public void setup() { |
|||
zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); |
|||
ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); |
|||
when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "client", client); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010"); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY); |
|||
ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); |
|||
|
|||
when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); |
|||
|
|||
List<ChildData> dataList = new ArrayList<>(); |
|||
dataList.add(currentData); |
|||
when(cache.getCurrentData()).thenReturn(dataList); |
|||
} |
|||
|
|||
@Test |
|||
public void restartNodeInTimeTest() throws Exception { |
|||
startNode(childData); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); |
|||
|
|||
reset(partitionService); |
|||
|
|||
stopNode(childData); |
|||
|
|||
assertEquals(1, zkDiscoveryService.delayedTasks.size()); |
|||
|
|||
verify(partitionService, never()).recalculatePartitions(any(), any()); |
|||
|
|||
startNode(childData); |
|||
|
|||
verify(partitionService, never()).recalculatePartitions(any(), any()); |
|||
|
|||
Thread.sleep(RECALCULATE_DELAY * 2); |
|||
|
|||
verify(partitionService, never()).recalculatePartitions(any(), any()); |
|||
|
|||
assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); |
|||
} |
|||
|
|||
@Test |
|||
public void restartNodeNotInTimeTest() throws Exception { |
|||
startNode(childData); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); |
|||
|
|||
reset(partitionService); |
|||
|
|||
stopNode(childData); |
|||
|
|||
assertEquals(1, zkDiscoveryService.delayedTasks.size()); |
|||
|
|||
Thread.sleep(RECALCULATE_DELAY * 2); |
|||
|
|||
assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); |
|||
|
|||
startNode(childData); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(Collections.emptyList())); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); |
|||
|
|||
reset(partitionService); |
|||
} |
|||
|
|||
@Test |
|||
public void startAnotherNodeDuringRestartTest() throws Exception { |
|||
var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-transport").build(); |
|||
var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray()); |
|||
|
|||
startNode(childData); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); |
|||
|
|||
reset(partitionService); |
|||
|
|||
stopNode(childData); |
|||
|
|||
assertEquals(1, zkDiscoveryService.delayedTasks.size()); |
|||
|
|||
startNode(anotherData); |
|||
|
|||
assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo))); |
|||
reset(partitionService); |
|||
|
|||
Thread.sleep(RECALCULATE_DELAY * 2); |
|||
|
|||
verify(partitionService, never()).recalculatePartitions(any(), any()); |
|||
|
|||
startNode(childData); |
|||
|
|||
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo, childInfo))); |
|||
} |
|||
|
|||
private void startNode(ChildData data) throws Exception { |
|||
cache.getCurrentData().add(data); |
|||
zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_ADDED, data)); |
|||
} |
|||
|
|||
private void stopNode(ChildData data) throws Exception { |
|||
cache.getCurrentData().remove(data); |
|||
zkDiscoveryService.childEvent(curatorFramework, new PathChildrenCacheEvent(CHILD_REMOVED, data)); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.model; |
|||
|
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.BDDMockito.willReturn; |
|||
import static org.mockito.Mockito.mock; |
|||
|
|||
class LwM2MModelConfigServiceImplTest { |
|||
|
|||
LwM2MModelConfigServiceImpl service; |
|||
TbLwM2MModelConfigStore modelStore; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
service = new LwM2MModelConfigServiceImpl(); |
|||
modelStore = mock(TbLwM2MModelConfigStore.class); |
|||
service.modelStore = modelStore; |
|||
} |
|||
|
|||
@Test |
|||
void testInitWithDuplicatedModels() { |
|||
LwM2MModelConfig config = new LwM2MModelConfig("urn:imei:951358811362976"); |
|||
List<LwM2MModelConfig> models = List.of(config, config); |
|||
willReturn(models).given(modelStore).getAll(); |
|||
service.init(); |
|||
assertThat(service.currentModelConfigs).containsExactlyEntriesOf(Map.of(config.getEndpoint(), config)); |
|||
} |
|||
|
|||
@Test |
|||
void testInitWithNonUniqueEndpoints() { |
|||
LwM2MModelConfig configAlfa = new LwM2MModelConfig("urn:imei:951358811362976"); |
|||
LwM2MModelConfig configBravo = new LwM2MModelConfig("urn:imei:151358811362976"); |
|||
LwM2MModelConfig configDelta = new LwM2MModelConfig("urn:imei:151358811362976"); |
|||
assertThat(configBravo.getEndpoint()).as("non-unique endpoints provided").isEqualTo(configDelta.getEndpoint()); |
|||
List<LwM2MModelConfig> models = List.of(configAlfa, configBravo, configDelta); |
|||
willReturn(models).given(modelStore).getAll(); |
|||
service.init(); |
|||
assertThat(service.currentModelConfigs).containsExactlyInAnyOrderEntriesOf(Map.of( |
|||
configAlfa.getEndpoint(), configAlfa, |
|||
configBravo.getEndpoint(), configBravo |
|||
)); |
|||
} |
|||
|
|||
@Test |
|||
void testInitWithEmptyModels() { |
|||
willReturn(Collections.emptyList()).given(modelStore).getAll(); |
|||
service.init(); |
|||
assertThat(service.currentModelConfigs).isEmpty(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,34 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.device; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.util.Map; |
|||
|
|||
@Configuration |
|||
@ConfigurationProperties(prefix = "device") |
|||
@Data |
|||
public class DeviceConnectivityConfiguration { |
|||
private Map<String, DeviceConnectivityInfo> connectivity; |
|||
|
|||
public boolean isEnabled(String protocol) { |
|||
var info = connectivity.get(protocol); |
|||
return info != null && info.isEnabled(); |
|||
} |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.device; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class DeviceConnectivityInfo { |
|||
private boolean enabled; |
|||
private String host; |
|||
private String port; |
|||
private String pemCertFile; |
|||
} |
|||
@ -0,0 +1,268 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.device; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.node.ArrayNode; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.core.io.ClassPathResource; |
|||
import org.springframework.core.io.Resource; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.DeviceProfile; |
|||
import org.thingsboard.server.common.data.DeviceTransportType; |
|||
import org.thingsboard.server.common.data.ResourceUtils; |
|||
import org.thingsboard.server.common.data.StringUtils; |
|||
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentials; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentialsType; |
|||
import org.thingsboard.server.dao.util.DeviceConnectivityUtil; |
|||
|
|||
import java.net.URI; |
|||
import java.net.URISyntaxException; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
|
|||
import static org.thingsboard.server.dao.service.Validator.validateId; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.CHECK_DOCUMENTATION; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.COAP; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.COAPS; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.DOCKER; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.HTTP; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.HTTPS; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.MQTT; |
|||
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.MQTTS; |
|||
|
|||
@Service("DeviceConnectivityDaoService") |
|||
@Slf4j |
|||
public class DeviceConnectivityServiceImpl implements DeviceConnectivityService { |
|||
|
|||
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; |
|||
public static final String INCORRECT_DEVICE_ID = "Incorrect deviceId "; |
|||
public static final String DEFAULT_DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry"; |
|||
|
|||
@Autowired |
|||
private DeviceCredentialsService deviceCredentialsService; |
|||
|
|||
@Autowired |
|||
private DeviceProfileService deviceProfileService; |
|||
|
|||
@Autowired |
|||
private DeviceConnectivityConfiguration deviceConnectivityConfiguration; |
|||
|
|||
@Override |
|||
public JsonNode findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException { |
|||
DeviceId deviceId = device.getId(); |
|||
log.trace("Executing findDevicePublishTelemetryCommands [{}]", deviceId); |
|||
validateId(deviceId, INCORRECT_DEVICE_ID + deviceId); |
|||
|
|||
DeviceCredentials creds = deviceCredentialsService.findDeviceCredentialsByDeviceId(device.getTenantId(), deviceId); |
|||
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId()); |
|||
DeviceTransportType transportType = deviceProfile.getTransportType(); |
|||
|
|||
ObjectNode commands = JacksonUtil.newObjectNode(); |
|||
switch (transportType) { |
|||
case DEFAULT: |
|||
Optional.ofNullable(getHttpTransportPublishCommands(baseUrl, creds)) |
|||
.ifPresent(v -> commands.set(HTTP, v)); |
|||
Optional.ofNullable(getMqttTransportPublishCommands(baseUrl, creds)) |
|||
.ifPresent(v -> commands.set(MQTT, v)); |
|||
Optional.ofNullable(getCoapTransportPublishCommands(baseUrl, creds)) |
|||
.ifPresent(v -> commands.set(COAP, v)); |
|||
break; |
|||
case MQTT: |
|||
MqttDeviceProfileTransportConfiguration transportConfiguration = |
|||
(MqttDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration(); |
|||
//TODO: add sparkplug command with emulator (check SSL)
|
|||
if (transportConfiguration.isSparkplug()) { |
|||
ObjectNode sparkplug = JacksonUtil.newObjectNode(); |
|||
sparkplug.put("sparkplug", CHECK_DOCUMENTATION); |
|||
commands.set(MQTT, sparkplug); |
|||
} else { |
|||
String topicName = transportConfiguration.getDeviceTelemetryTopic(); |
|||
|
|||
Optional.ofNullable(getMqttTransportPublishCommands(baseUrl, topicName, creds)) |
|||
.ifPresent(v -> commands.set(MQTT, v)); |
|||
} |
|||
break; |
|||
case COAP: |
|||
Optional.ofNullable(getCoapTransportPublishCommands(baseUrl, creds)) |
|||
.ifPresent(v -> commands.set(COAP, v)); |
|||
break; |
|||
default: |
|||
commands.put(transportType.name(), CHECK_DOCUMENTATION); |
|||
} |
|||
return commands; |
|||
} |
|||
|
|||
@Override |
|||
public Resource getPemCertFile(String protocol) { |
|||
String certFilePath = deviceConnectivityConfiguration.getConnectivity() |
|||
.get(protocol) |
|||
.getPemCertFile(); |
|||
|
|||
if (StringUtils.isNotBlank(certFilePath) && ResourceUtils.resourceExists(this, certFilePath)) { |
|||
return new ClassPathResource(certFilePath); |
|||
} else { |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
private JsonNode getHttpTransportPublishCommands(String defaultHostname, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
ObjectNode httpCommands = JacksonUtil.newObjectNode(); |
|||
Optional.ofNullable(getHttpPublishCommand(HTTP, defaultHostname, deviceCredentials)) |
|||
.ifPresent(v -> httpCommands.put(HTTP, v)); |
|||
Optional.ofNullable(getHttpPublishCommand(HTTPS, defaultHostname, deviceCredentials)) |
|||
.ifPresent(v -> httpCommands.put(HTTPS, v)); |
|||
return httpCommands.isEmpty() ? null : httpCommands; |
|||
} |
|||
|
|||
private String getHttpPublishCommand(String protocol, String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
DeviceConnectivityInfo properties = deviceConnectivityConfiguration.getConnectivity().get(protocol); |
|||
if (properties == null || !properties.isEnabled() || |
|||
deviceCredentials.getCredentialsType() != DeviceCredentialsType.ACCESS_TOKEN) { |
|||
return null; |
|||
} |
|||
String hostName = getHost(baseUrl, properties); |
|||
String port = properties.getPort().isEmpty() ? "" : ":" + properties.getPort(); |
|||
|
|||
return DeviceConnectivityUtil.getHttpPublishCommand(protocol, hostName, port, deviceCredentials); |
|||
} |
|||
|
|||
private JsonNode getMqttTransportPublishCommands(String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
return getMqttTransportPublishCommands(baseUrl, DEFAULT_DEVICE_TELEMETRY_TOPIC, deviceCredentials); |
|||
} |
|||
|
|||
private JsonNode getMqttTransportPublishCommands(String baseUrl, String topic, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
ObjectNode mqttCommands = JacksonUtil.newObjectNode(); |
|||
|
|||
if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.X509_CERTIFICATE) { |
|||
mqttCommands.put(MQTTS, CHECK_DOCUMENTATION); |
|||
return mqttCommands; |
|||
} |
|||
|
|||
ObjectNode dockerMqttCommands = JacksonUtil.newObjectNode(); |
|||
|
|||
if (deviceConnectivityConfiguration.isEnabled(MQTT)) { |
|||
Optional.ofNullable(getMqttPublishCommand(baseUrl, topic, deviceCredentials)). |
|||
ifPresent(v -> mqttCommands.put(MQTT, v)); |
|||
|
|||
Optional.ofNullable(getDockerMqttPublishCommand(MQTT, baseUrl, topic, deviceCredentials)) |
|||
.ifPresent(v -> dockerMqttCommands.put(MQTT, v)); |
|||
} |
|||
|
|||
if (deviceConnectivityConfiguration.isEnabled(MQTTS)) { |
|||
List<String> mqttsPublishCommand = getMqttsPublishCommand(baseUrl, topic, deviceCredentials); |
|||
if (mqttsPublishCommand != null) { |
|||
ArrayNode arrayNode = mqttCommands.putArray(MQTTS); |
|||
mqttsPublishCommand.forEach(arrayNode::add); |
|||
} |
|||
|
|||
Optional.ofNullable(getDockerMqttPublishCommand(MQTTS, baseUrl, topic, deviceCredentials)) |
|||
.ifPresent(v -> dockerMqttCommands.put(MQTTS, v)); |
|||
} |
|||
|
|||
if (!dockerMqttCommands.isEmpty()) { |
|||
mqttCommands.set(DOCKER, dockerMqttCommands); |
|||
} |
|||
return mqttCommands.isEmpty() ? null : mqttCommands; |
|||
} |
|||
|
|||
private String getMqttPublishCommand(String baseUrl, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
DeviceConnectivityInfo properties = deviceConnectivityConfiguration.getConnectivity().get(MQTT); |
|||
String mqttHost = getHost(baseUrl, properties); |
|||
String mqttPort = properties.getPort().isEmpty() ? null : properties.getPort(); |
|||
return DeviceConnectivityUtil.getMqttPublishCommand(MQTT, mqttHost, mqttPort, deviceTelemetryTopic, deviceCredentials); |
|||
} |
|||
|
|||
private List<String> getMqttsPublishCommand(String baseUrl, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
DeviceConnectivityInfo properties = deviceConnectivityConfiguration.getConnectivity().get(MQTTS); |
|||
String mqttHost = getHost(baseUrl, properties); |
|||
String mqttPort = properties.getPort().isEmpty() ? null : properties.getPort(); |
|||
String pubCommand = DeviceConnectivityUtil.getMqttPublishCommand(MQTTS, mqttHost, mqttPort, deviceTelemetryTopic, deviceCredentials); |
|||
|
|||
ArrayList<String> commands = new ArrayList<>(); |
|||
if (pubCommand != null) { |
|||
commands.add(DeviceConnectivityUtil.getCurlPemCertCommand(baseUrl, MQTTS)); |
|||
commands.add(pubCommand); |
|||
return commands; |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
private String getDockerMqttPublishCommand(String protocol, String baseUrl, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
DeviceConnectivityInfo properties = deviceConnectivityConfiguration.getConnectivity().get(protocol); |
|||
String mqttHost = getHost(baseUrl, properties); |
|||
String mqttPort = properties.getPort().isEmpty() ? null : properties.getPort(); |
|||
return DeviceConnectivityUtil.getDockerMqttPublishCommand(protocol, baseUrl, mqttHost, mqttPort, deviceTelemetryTopic, deviceCredentials); |
|||
} |
|||
|
|||
private JsonNode getCoapTransportPublishCommands(String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
ObjectNode coapCommands = JacksonUtil.newObjectNode(); |
|||
|
|||
if (deviceCredentials.getCredentialsType() == DeviceCredentialsType.X509_CERTIFICATE) { |
|||
coapCommands.put(COAPS, CHECK_DOCUMENTATION); |
|||
return coapCommands; |
|||
} |
|||
|
|||
ObjectNode dockerCoapCommands = JacksonUtil.newObjectNode(); |
|||
|
|||
if (deviceConnectivityConfiguration.isEnabled(COAP)) { |
|||
Optional.ofNullable(getCoapPublishCommand(COAP, baseUrl, deviceCredentials)) |
|||
.ifPresent(v -> coapCommands.put(COAP, v)); |
|||
|
|||
Optional.ofNullable(getDockerCoapPublishCommand(COAP, baseUrl, deviceCredentials)) |
|||
.ifPresent(v -> dockerCoapCommands.put(COAP, v)); |
|||
} |
|||
|
|||
if (deviceConnectivityConfiguration.isEnabled(COAPS)) { |
|||
Optional.ofNullable(getCoapPublishCommand(COAPS, baseUrl, deviceCredentials)) |
|||
.ifPresent(v -> coapCommands.put(COAPS, v)); |
|||
|
|||
Optional.ofNullable(getDockerCoapPublishCommand(COAPS, baseUrl, deviceCredentials)) |
|||
.ifPresent(v -> dockerCoapCommands.put(COAPS, v)); |
|||
} |
|||
|
|||
if (!dockerCoapCommands.isEmpty()) { |
|||
coapCommands.set(DOCKER, dockerCoapCommands); |
|||
} |
|||
|
|||
return coapCommands.isEmpty() ? null : coapCommands; |
|||
} |
|||
|
|||
private String getCoapPublishCommand(String protocol, String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
DeviceConnectivityInfo properties = deviceConnectivityConfiguration.getConnectivity().get(protocol); |
|||
String hostName = getHost(baseUrl, properties); |
|||
String port = properties.getPort().isEmpty() ? "" : ":" + properties.getPort(); |
|||
return DeviceConnectivityUtil.getCoapPublishCommand(protocol, hostName, port, deviceCredentials); |
|||
} |
|||
|
|||
private String getDockerCoapPublishCommand(String protocol, String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException { |
|||
DeviceConnectivityInfo properties = deviceConnectivityConfiguration.getConnectivity().get(protocol); |
|||
String host = getHost(baseUrl, properties); |
|||
String port = properties.getPort().isEmpty() ? "" : ":" + properties.getPort(); |
|||
return DeviceConnectivityUtil.getDockerCoapPublishCommand(protocol, host, port, deviceCredentials); |
|||
} |
|||
|
|||
private String getHost(String baseUrl, DeviceConnectivityInfo properties) throws URISyntaxException { |
|||
return properties.getHost().isEmpty() ? new URI(baseUrl).getHost() : properties.getHost(); |
|||
} |
|||
} |
|||
@ -0,0 +1,123 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.util; |
|||
|
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; |
|||
import org.thingsboard.server.common.data.security.DeviceCredentials; |
|||
|
|||
public class DeviceConnectivityUtil { |
|||
|
|||
public static final String HTTP = "http"; |
|||
public static final String HTTPS = "https"; |
|||
public static final String MQTT = "mqtt"; |
|||
public static final String LINUX = "linux"; |
|||
public static final String WINDOWS = "windows"; |
|||
public static final String DOCKER = "docker"; |
|||
public static final String MQTTS = "mqtts"; |
|||
public static final String COAP = "coap"; |
|||
public static final String COAPS = "coaps"; |
|||
public static final String PEM_CERT_FILE_NAME = "tb-server-chain.pem"; |
|||
public static final String CHECK_DOCUMENTATION = "Check documentation"; |
|||
public static final String JSON_EXAMPLE_PAYLOAD = "\"{temperature:25}\""; |
|||
public static final String DOCKER_RUN = "docker run --rm -it "; |
|||
public static final String MQTT_IMAGE = "thingsboard/mosquitto-clients "; |
|||
public static final String COAP_IMAGE = "thingsboard/coap-clients "; |
|||
|
|||
public static String getHttpPublishCommand(String protocol, String host, String port, DeviceCredentials deviceCredentials) { |
|||
return String.format("curl -v -X POST %s://%s%s/api/v1/%s/telemetry --header Content-Type:application/json --data " + JSON_EXAMPLE_PAYLOAD, |
|||
protocol, host, port, deviceCredentials.getCredentialsId()); |
|||
} |
|||
|
|||
public static String getMqttPublishCommand(String protocol, String host, String port, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) { |
|||
StringBuilder command = new StringBuilder("mosquitto_pub -d -q 1"); |
|||
if (MQTTS.equals(protocol)) { |
|||
command.append(" --cafile ").append(PEM_CERT_FILE_NAME); |
|||
} |
|||
command.append(" -h ").append(host).append(port == null ? "" : " -p " + port); |
|||
command.append(" -t ").append(deviceTelemetryTopic); |
|||
|
|||
switch (deviceCredentials.getCredentialsType()) { |
|||
case ACCESS_TOKEN: |
|||
command.append(" -u ").append(deviceCredentials.getCredentialsId()); |
|||
break; |
|||
case MQTT_BASIC: |
|||
BasicMqttCredentials credentials = JacksonUtil.fromString(deviceCredentials.getCredentialsValue(), |
|||
BasicMqttCredentials.class); |
|||
if (credentials != null) { |
|||
if (credentials.getClientId() != null) { |
|||
command.append(" -i ").append(credentials.getClientId()); |
|||
} |
|||
if (credentials.getUserName() != null) { |
|||
command.append(" -u ").append(credentials.getUserName()); |
|||
} |
|||
if (credentials.getPassword() != null) { |
|||
command.append(" -P ").append(credentials.getPassword()); |
|||
} |
|||
} else { |
|||
return null; |
|||
} |
|||
break; |
|||
default: |
|||
return null; |
|||
} |
|||
command.append(" -m " + JSON_EXAMPLE_PAYLOAD); |
|||
return command.toString(); |
|||
} |
|||
|
|||
public static String getDockerMqttPublishCommand(String protocol, String baseUrl, String host, String port, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) { |
|||
String mqttCommand = getMqttPublishCommand(protocol, host, port, deviceTelemetryTopic, deviceCredentials); |
|||
|
|||
if (mqttCommand == null) { |
|||
return null; |
|||
} |
|||
|
|||
StringBuilder mqttDockerCommand = new StringBuilder(); |
|||
mqttDockerCommand.append(DOCKER_RUN).append(MQTT_IMAGE); |
|||
|
|||
if (MQTTS.equals(protocol)) { |
|||
mqttDockerCommand.append("/bin/sh -c \"") |
|||
.append(getCurlPemCertCommand(baseUrl, protocol)) |
|||
.append(" && ") |
|||
.append(mqttCommand) |
|||
.append("\""); |
|||
} else { |
|||
mqttDockerCommand.append(mqttCommand); |
|||
} |
|||
|
|||
return mqttDockerCommand.toString(); |
|||
} |
|||
|
|||
public static String getCurlPemCertCommand(String baseUrl, String protocol) { |
|||
return String.format("curl -f -S -o %s %s/api/device-connectivity/%s/certificate/download", PEM_CERT_FILE_NAME, baseUrl, protocol); |
|||
} |
|||
|
|||
public static String getCoapPublishCommand(String protocol, String host, String port, DeviceCredentials deviceCredentials) { |
|||
switch (deviceCredentials.getCredentialsType()) { |
|||
case ACCESS_TOKEN: |
|||
String client = COAPS.equals(protocol) ? "coap-client-openssl" : "coap-client"; |
|||
return String.format("%s -m POST %s://%s%s/api/v1/%s/telemetry -t json -e %s", |
|||
client, protocol, host, port, deviceCredentials.getCredentialsId(), JSON_EXAMPLE_PAYLOAD); |
|||
default: |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
public static String getDockerCoapPublishCommand(String protocol, String host, String port, DeviceCredentials deviceCredentials) { |
|||
String coapCommand = getCoapPublishCommand(protocol, host, port, deviceCredentials); |
|||
return coapCommand != null ? String.format("%s%s%s", DOCKER_RUN, COAP_IMAGE, coapCommand) : null; |
|||
} |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue