167 changed files with 6529 additions and 2295 deletions
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -0,0 +1,65 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.instructions; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.Setter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.thingsboard.server.service.install.InstallScripts; |
|||
|
|||
import java.io.IOException; |
|||
import java.nio.file.Files; |
|||
import java.nio.file.Path; |
|||
import java.nio.file.Paths; |
|||
|
|||
@Slf4j |
|||
@RequiredArgsConstructor |
|||
public abstract class BaseEdgeInstallUpgradeInstructionsService { |
|||
|
|||
private static final String EDGE_DIR = "edge"; |
|||
private static final String INSTRUCTIONS_DIR = "instructions"; |
|||
|
|||
private final InstallScripts installScripts; |
|||
|
|||
@Value("${app.version:unknown}") |
|||
@Setter |
|||
protected String appVersion; |
|||
|
|||
protected String readFile(Path file) { |
|||
try { |
|||
return Files.readString(file); |
|||
} catch (IOException e) { |
|||
log.warn("Failed to read file: {}", file, e); |
|||
throw new RuntimeException(e); |
|||
} |
|||
} |
|||
|
|||
protected String getTagVersion(String version) { |
|||
return version.endsWith(".0") ? version.substring(0, version.length() - 2) : version; |
|||
} |
|||
|
|||
protected Path resolveFile(String subDir, String... subDirs) { |
|||
return getEdgeInstructionsDir().resolve(Paths.get(subDir, subDirs)); |
|||
} |
|||
|
|||
protected Path getEdgeInstructionsDir() { |
|||
return Paths.get(installScripts.getDataDir(), InstallScripts.JSON_DIR, EDGE_DIR, INSTRUCTIONS_DIR, getBaseDirName()); |
|||
} |
|||
|
|||
protected abstract String getBaseDirName(); |
|||
|
|||
} |
|||
@ -1,478 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.sync.ie; |
|||
|
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import com.fasterxml.jackson.databind.node.TextNode; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.rule.engine.debug.TbMsgGeneratorNode; |
|||
import org.thingsboard.rule.engine.debug.TbMsgGeneratorNodeConfiguration; |
|||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNode; |
|||
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; |
|||
import org.thingsboard.server.common.data.Customer; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.DeviceProfile; |
|||
import org.thingsboard.server.common.data.DeviceProfileType; |
|||
import org.thingsboard.server.common.data.DeviceTransportType; |
|||
import org.thingsboard.server.common.data.EntityView; |
|||
import org.thingsboard.server.common.data.ExportableEntity; |
|||
import org.thingsboard.server.common.data.HasTenantId; |
|||
import org.thingsboard.server.common.data.OtaPackage; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.asset.Asset; |
|||
import org.thingsboard.server.common.data.asset.AssetProfile; |
|||
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; |
|||
import org.thingsboard.server.common.data.device.data.DeviceData; |
|||
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration; |
|||
import org.thingsboard.server.common.data.device.profile.DeviceProfileData; |
|||
import org.thingsboard.server.common.data.id.AssetId; |
|||
import org.thingsboard.server.common.data.id.AssetProfileId; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.DashboardId; |
|||
import org.thingsboard.server.common.data.id.DeviceProfileId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.data.ota.ChecksumAlgorithm; |
|||
import org.thingsboard.server.common.data.ota.OtaPackageType; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.RelationTypeGroup; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.data.rule.RuleChainType; |
|||
import org.thingsboard.server.common.data.rule.RuleNode; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportData; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityImportResult; |
|||
import org.thingsboard.server.common.data.sync.ie.EntityImportSettings; |
|||
import org.thingsboard.server.common.data.util.ThrowingRunnable; |
|||
import org.thingsboard.server.controller.AbstractControllerTest; |
|||
import org.thingsboard.server.dao.asset.AssetProfileService; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.customer.CustomerService; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.device.DeviceProfileService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.entityview.EntityViewService; |
|||
import org.thingsboard.server.dao.ota.OtaPackageService; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
import org.thingsboard.server.service.security.model.SecurityUser; |
|||
import org.thingsboard.server.service.security.model.UserPrincipal; |
|||
import org.thingsboard.server.service.sync.vc.data.EntitiesImportCtx; |
|||
import org.thingsboard.server.service.sync.vc.data.SimpleEntitiesExportCtx; |
|||
|
|||
import java.nio.ByteBuffer; |
|||
import java.util.Arrays; |
|||
import java.util.Collections; |
|||
import java.util.UUID; |
|||
import java.util.function.Function; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
public abstract class BaseExportImportServiceTest extends AbstractControllerTest { |
|||
|
|||
@Autowired |
|||
protected EntitiesExportImportService exportImportService; |
|||
@Autowired |
|||
protected DeviceService deviceService; |
|||
@Autowired |
|||
protected OtaPackageService otaPackageService; |
|||
@Autowired |
|||
protected DeviceProfileService deviceProfileService; |
|||
@Autowired |
|||
protected AssetProfileService assetProfileService; |
|||
@Autowired |
|||
protected AssetService assetService; |
|||
@Autowired |
|||
protected CustomerService customerService; |
|||
@Autowired |
|||
protected RuleChainService ruleChainService; |
|||
@Autowired |
|||
protected DashboardService dashboardService; |
|||
@Autowired |
|||
protected RelationService relationService; |
|||
@Autowired |
|||
protected TenantService tenantService; |
|||
@Autowired |
|||
protected EntityViewService entityViewService; |
|||
|
|||
protected TenantId tenantId1; |
|||
protected User tenantAdmin1; |
|||
|
|||
protected TenantId tenantId2; |
|||
protected User tenantAdmin2; |
|||
|
|||
@Before |
|||
public void beforeEach() throws Exception { |
|||
loginSysAdmin(); |
|||
Tenant tenant1 = new Tenant(); |
|||
tenant1.setTitle("Tenant 1"); |
|||
tenant1.setEmail("tenant1@thingsboard.org"); |
|||
this.tenantId1 = tenantService.saveTenant(tenant1).getId(); |
|||
User tenantAdmin1 = new User(); |
|||
tenantAdmin1.setTenantId(tenantId1); |
|||
tenantAdmin1.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin1.setEmail("tenant1-admin@thingsboard.org"); |
|||
this.tenantAdmin1 = createUser(tenantAdmin1, "12345678"); |
|||
Tenant tenant2 = new Tenant(); |
|||
tenant2.setTitle("Tenant 2"); |
|||
tenant2.setEmail("tenant2@thingsboard.org"); |
|||
this.tenantId2 = tenantService.saveTenant(tenant2).getId(); |
|||
User tenantAdmin2 = new User(); |
|||
tenantAdmin2.setTenantId(tenantId2); |
|||
tenantAdmin2.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin2.setEmail("tenant2-admin@thingsboard.org"); |
|||
this.tenantAdmin2 = createUser(tenantAdmin2, "12345678"); |
|||
} |
|||
|
|||
@After |
|||
public void afterEach() { |
|||
tenantService.deleteTenant(tenantId1); |
|||
tenantService.deleteTenant(tenantId2); |
|||
} |
|||
|
|||
protected Device createDevice(TenantId tenantId, CustomerId customerId, DeviceProfileId deviceProfileId, String name) { |
|||
Device device = new Device(); |
|||
device.setTenantId(tenantId); |
|||
device.setCustomerId(customerId); |
|||
device.setName(name); |
|||
device.setLabel("lbl"); |
|||
device.setDeviceProfileId(deviceProfileId); |
|||
DeviceData deviceData = new DeviceData(); |
|||
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); |
|||
device.setDeviceData(deviceData); |
|||
return deviceService.saveDevice(device); |
|||
} |
|||
|
|||
protected OtaPackage createOtaPackage(TenantId tenantId, DeviceProfileId deviceProfileId, OtaPackageType type) { |
|||
OtaPackage otaPackage = new OtaPackage(); |
|||
otaPackage.setTenantId(tenantId); |
|||
otaPackage.setDeviceProfileId(deviceProfileId); |
|||
otaPackage.setType(type); |
|||
otaPackage.setTitle("My " + type); |
|||
otaPackage.setVersion("v1.0"); |
|||
otaPackage.setFileName("filename.txt"); |
|||
otaPackage.setContentType("text/plain"); |
|||
otaPackage.setChecksumAlgorithm(ChecksumAlgorithm.SHA256); |
|||
otaPackage.setChecksum("4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a"); |
|||
otaPackage.setDataSize(1L); |
|||
otaPackage.setData(ByteBuffer.wrap(new byte[]{(int) 1})); |
|||
return otaPackageService.saveOtaPackage(otaPackage); |
|||
} |
|||
|
|||
protected void checkImportedDeviceData(Device initialDevice, Device importedDevice) { |
|||
assertThat(importedDevice.getName()).isEqualTo(initialDevice.getName()); |
|||
assertThat(importedDevice.getType()).isEqualTo(initialDevice.getType()); |
|||
assertThat(importedDevice.getDeviceData()).isEqualTo(initialDevice.getDeviceData()); |
|||
assertThat(importedDevice.getLabel()).isEqualTo(initialDevice.getLabel()); |
|||
} |
|||
|
|||
protected DeviceProfile createDeviceProfile(TenantId tenantId, RuleChainId defaultRuleChainId, DashboardId defaultDashboardId, String name) { |
|||
DeviceProfile deviceProfile = new DeviceProfile(); |
|||
deviceProfile.setTenantId(tenantId); |
|||
deviceProfile.setName(name); |
|||
deviceProfile.setDescription("dscrptn"); |
|||
deviceProfile.setType(DeviceProfileType.DEFAULT); |
|||
deviceProfile.setTransportType(DeviceTransportType.DEFAULT); |
|||
deviceProfile.setDefaultRuleChainId(defaultRuleChainId); |
|||
deviceProfile.setDefaultDashboardId(defaultDashboardId); |
|||
DeviceProfileData profileData = new DeviceProfileData(); |
|||
profileData.setConfiguration(new DefaultDeviceProfileConfiguration()); |
|||
profileData.setTransportConfiguration(new DefaultDeviceProfileTransportConfiguration()); |
|||
deviceProfile.setProfileData(profileData); |
|||
return deviceProfileService.saveDeviceProfile(deviceProfile); |
|||
} |
|||
|
|||
protected void checkImportedDeviceProfileData(DeviceProfile initialProfile, DeviceProfile importedProfile) { |
|||
assertThat(initialProfile.getName()).isEqualTo(importedProfile.getName()); |
|||
assertThat(initialProfile.getType()).isEqualTo(importedProfile.getType()); |
|||
assertThat(initialProfile.getTransportType()).isEqualTo(importedProfile.getTransportType()); |
|||
assertThat(initialProfile.getProfileData()).isEqualTo(importedProfile.getProfileData()); |
|||
assertThat(initialProfile.getDescription()).isEqualTo(importedProfile.getDescription()); |
|||
} |
|||
|
|||
protected AssetProfile createAssetProfile(TenantId tenantId, RuleChainId defaultRuleChainId, DashboardId defaultDashboardId, String name) { |
|||
AssetProfile assetProfile = new AssetProfile(); |
|||
assetProfile.setTenantId(tenantId); |
|||
assetProfile.setName(name); |
|||
assetProfile.setDescription("dscrptn"); |
|||
assetProfile.setDefaultRuleChainId(defaultRuleChainId); |
|||
assetProfile.setDefaultDashboardId(defaultDashboardId); |
|||
return assetProfileService.saveAssetProfile(assetProfile); |
|||
} |
|||
|
|||
protected void checkImportedAssetProfileData(AssetProfile initialProfile, AssetProfile importedProfile) { |
|||
assertThat(initialProfile.getName()).isEqualTo(importedProfile.getName()); |
|||
assertThat(initialProfile.getDescription()).isEqualTo(importedProfile.getDescription()); |
|||
} |
|||
|
|||
protected Asset createAsset(TenantId tenantId, CustomerId customerId, AssetProfileId assetProfileId, String name) { |
|||
Asset asset = new Asset(); |
|||
asset.setTenantId(tenantId); |
|||
asset.setCustomerId(customerId); |
|||
asset.setAssetProfileId(assetProfileId); |
|||
asset.setName(name); |
|||
asset.setLabel("lbl"); |
|||
asset.setAdditionalInfo(JacksonUtil.newObjectNode().set("a", new TextNode("b"))); |
|||
return assetService.saveAsset(asset); |
|||
} |
|||
|
|||
protected void checkImportedAssetData(Asset initialAsset, Asset importedAsset) { |
|||
assertThat(importedAsset.getName()).isEqualTo(initialAsset.getName()); |
|||
assertThat(importedAsset.getType()).isEqualTo(initialAsset.getType()); |
|||
assertThat(importedAsset.getLabel()).isEqualTo(initialAsset.getLabel()); |
|||
assertThat(importedAsset.getAdditionalInfo()).isEqualTo(initialAsset.getAdditionalInfo()); |
|||
} |
|||
|
|||
protected Customer createCustomer(TenantId tenantId, String name) { |
|||
Customer customer = new Customer(); |
|||
customer.setTenantId(tenantId); |
|||
customer.setTitle(name); |
|||
customer.setCountry("ua"); |
|||
customer.setAddress("abb"); |
|||
customer.setEmail("ccc@aa.org"); |
|||
customer.setAdditionalInfo(JacksonUtil.newObjectNode().set("a", new TextNode("b"))); |
|||
return customerService.saveCustomer(customer); |
|||
} |
|||
|
|||
protected void checkImportedCustomerData(Customer initialCustomer, Customer importedCustomer) { |
|||
assertThat(importedCustomer.getTitle()).isEqualTo(initialCustomer.getTitle()); |
|||
assertThat(importedCustomer.getCountry()).isEqualTo(initialCustomer.getCountry()); |
|||
assertThat(importedCustomer.getAddress()).isEqualTo(initialCustomer.getAddress()); |
|||
assertThat(importedCustomer.getEmail()).isEqualTo(initialCustomer.getEmail()); |
|||
} |
|||
|
|||
protected Dashboard createDashboard(TenantId tenantId, CustomerId customerId, String name) { |
|||
Dashboard dashboard = new Dashboard(); |
|||
dashboard.setTenantId(tenantId); |
|||
dashboard.setTitle(name); |
|||
dashboard.setConfiguration(JacksonUtil.newObjectNode().set("a", new TextNode("b"))); |
|||
dashboard.setImage("abvregewrg"); |
|||
dashboard.setMobileHide(true); |
|||
dashboard = dashboardService.saveDashboard(dashboard); |
|||
if (customerId != null) { |
|||
dashboardService.assignDashboardToCustomer(tenantId, dashboard.getId(), customerId); |
|||
return dashboardService.findDashboardById(tenantId, dashboard.getId()); |
|||
} |
|||
return dashboard; |
|||
} |
|||
|
|||
protected Dashboard createDashboard(TenantId tenantId, CustomerId customerId, String name, AssetId assetForEntityAlias) { |
|||
Dashboard dashboard = createDashboard(tenantId, customerId, name); |
|||
String entityAliases = "{\n" + |
|||
"\t\"23c4185d-1497-9457-30b2-6d91e69a5b2c\": {\n" + |
|||
"\t\t\"alias\": \"assets\",\n" + |
|||
"\t\t\"filter\": {\n" + |
|||
"\t\t\t\"entityList\": [\n" + |
|||
"\t\t\t\t\"" + assetForEntityAlias.getId().toString() + "\"\n" + |
|||
"\t\t\t],\n" + |
|||
"\t\t\t\"entityType\": \"ASSET\",\n" + |
|||
"\t\t\t\"resolveMultiple\": true,\n" + |
|||
"\t\t\t\"type\": \"entityList\"\n" + |
|||
"\t\t},\n" + |
|||
"\t\t\"id\": \"23c4185d-1497-9457-30b2-6d91e69a5b2c\"\n" + |
|||
"\t}\n" + |
|||
"}"; |
|||
ObjectNode dashboardConfiguration = JacksonUtil.newObjectNode(); |
|||
dashboardConfiguration.set("entityAliases", JacksonUtil.toJsonNode(entityAliases)); |
|||
dashboardConfiguration.set("description", new TextNode("hallo")); |
|||
dashboard.setConfiguration(dashboardConfiguration); |
|||
return dashboardService.saveDashboard(dashboard); |
|||
} |
|||
|
|||
protected void checkImportedDashboardData(Dashboard initialDashboard, Dashboard importedDashboard) { |
|||
assertThat(importedDashboard.getTitle()).isEqualTo(initialDashboard.getTitle()); |
|||
assertThat(importedDashboard.getConfiguration()).isEqualTo(initialDashboard.getConfiguration()); |
|||
assertThat(importedDashboard.getImage()).isEqualTo(initialDashboard.getImage()); |
|||
assertThat(importedDashboard.isMobileHide()).isEqualTo(initialDashboard.isMobileHide()); |
|||
if (initialDashboard.getAssignedCustomers() != null) { |
|||
assertThat(importedDashboard.getAssignedCustomers()).containsAll(initialDashboard.getAssignedCustomers()); |
|||
} |
|||
} |
|||
protected RuleChain createRuleChain(TenantId tenantId, String name, EntityId originatorId) { |
|||
RuleChain ruleChain = new RuleChain(); |
|||
ruleChain.setTenantId(tenantId); |
|||
ruleChain.setName(name); |
|||
ruleChain.setType(RuleChainType.CORE); |
|||
ruleChain.setDebugMode(true); |
|||
ruleChain.setConfiguration(JacksonUtil.newObjectNode().set("a", new TextNode("b"))); |
|||
ruleChain = ruleChainService.saveRuleChain(ruleChain); |
|||
|
|||
RuleChainMetaData metaData = new RuleChainMetaData(); |
|||
metaData.setRuleChainId(ruleChain.getId()); |
|||
|
|||
RuleNode ruleNode1 = new RuleNode(); |
|||
ruleNode1.setName("Generator 1"); |
|||
ruleNode1.setType(TbMsgGeneratorNode.class.getName()); |
|||
ruleNode1.setDebugMode(true); |
|||
TbMsgGeneratorNodeConfiguration configuration1 = new TbMsgGeneratorNodeConfiguration(); |
|||
configuration1.setOriginatorType(originatorId.getEntityType()); |
|||
configuration1.setOriginatorId(originatorId.getId().toString()); |
|||
ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1)); |
|||
|
|||
RuleNode ruleNode2 = new RuleNode(); |
|||
ruleNode2.setName("Simple Rule Node 2"); |
|||
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); |
|||
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); |
|||
ruleNode2.setDebugMode(true); |
|||
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); |
|||
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); |
|||
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); |
|||
|
|||
metaData.setNodes(Arrays.asList(ruleNode1, ruleNode2)); |
|||
metaData.setFirstNodeIndex(0); |
|||
metaData.addConnectionInfo(0, 1, TbNodeConnectionType.SUCCESS); |
|||
ruleChainService.saveRuleChainMetaData(tenantId, metaData, Function.identity()); |
|||
|
|||
return ruleChainService.findRuleChainById(tenantId, ruleChain.getId()); |
|||
} |
|||
|
|||
protected RuleChain createRuleChain(TenantId tenantId, String name) { |
|||
RuleChain ruleChain = new RuleChain(); |
|||
ruleChain.setTenantId(tenantId); |
|||
ruleChain.setName(name); |
|||
ruleChain.setType(RuleChainType.CORE); |
|||
ruleChain.setDebugMode(true); |
|||
ruleChain.setConfiguration(JacksonUtil.newObjectNode().set("a", new TextNode("b"))); |
|||
ruleChain = ruleChainService.saveRuleChain(ruleChain); |
|||
|
|||
RuleChainMetaData metaData = new RuleChainMetaData(); |
|||
metaData.setRuleChainId(ruleChain.getId()); |
|||
|
|||
RuleNode ruleNode1 = new RuleNode(); |
|||
ruleNode1.setName("Simple Rule Node 1"); |
|||
ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); |
|||
ruleNode1.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); |
|||
ruleNode1.setDebugMode(true); |
|||
TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration(); |
|||
configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1")); |
|||
ruleNode1.setConfiguration(JacksonUtil.valueToTree(configuration1)); |
|||
|
|||
RuleNode ruleNode2 = new RuleNode(); |
|||
ruleNode2.setName("Simple Rule Node 2"); |
|||
ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName()); |
|||
ruleNode2.setConfigurationVersion(TbGetAttributesNode.class.getAnnotation(org.thingsboard.rule.engine.api.RuleNode.class).version()); |
|||
ruleNode2.setDebugMode(true); |
|||
TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration(); |
|||
configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2")); |
|||
ruleNode2.setConfiguration(JacksonUtil.valueToTree(configuration2)); |
|||
|
|||
metaData.setNodes(Arrays.asList(ruleNode1, ruleNode2)); |
|||
metaData.setFirstNodeIndex(0); |
|||
metaData.addConnectionInfo(0, 1, TbNodeConnectionType.SUCCESS); |
|||
ruleChainService.saveRuleChainMetaData(tenantId, metaData, Function.identity()); |
|||
|
|||
return ruleChainService.findRuleChainById(tenantId, ruleChain.getId()); |
|||
} |
|||
|
|||
protected void checkImportedRuleChainData(RuleChain initialRuleChain, RuleChainMetaData initialMetaData, RuleChain importedRuleChain, RuleChainMetaData importedMetaData) { |
|||
assertThat(importedRuleChain.getType()).isEqualTo(initialRuleChain.getType()); |
|||
assertThat(importedRuleChain.getName()).isEqualTo(initialRuleChain.getName()); |
|||
assertThat(importedRuleChain.isDebugMode()).isEqualTo(initialRuleChain.isDebugMode()); |
|||
assertThat(importedRuleChain.getConfiguration()).isEqualTo(initialRuleChain.getConfiguration()); |
|||
|
|||
assertThat(importedMetaData.getConnections()).isEqualTo(initialMetaData.getConnections()); |
|||
assertThat(importedMetaData.getFirstNodeIndex()).isEqualTo(initialMetaData.getFirstNodeIndex()); |
|||
for (int i = 0; i < initialMetaData.getNodes().size(); i++) { |
|||
RuleNode initialNode = initialMetaData.getNodes().get(i); |
|||
RuleNode importedNode = importedMetaData.getNodes().get(i); |
|||
assertThat(importedNode.getRuleChainId()).isEqualTo(importedRuleChain.getId()); |
|||
assertThat(importedNode.getName()).isEqualTo(initialNode.getName()); |
|||
assertThat(importedNode.getType()).isEqualTo(initialNode.getType()); |
|||
assertThat(importedNode.getConfiguration()).isEqualTo(initialNode.getConfiguration()); |
|||
assertThat(importedNode.getAdditionalInfo()).isEqualTo(initialNode.getAdditionalInfo()); |
|||
} |
|||
} |
|||
|
|||
protected EntityView createEntityView(TenantId tenantId, CustomerId customerId, EntityId entityId, String name) { |
|||
EntityView entityView = new EntityView(); |
|||
entityView.setTenantId(tenantId); |
|||
entityView.setEntityId(entityId); |
|||
entityView.setCustomerId(customerId); |
|||
entityView.setName(name); |
|||
entityView.setType("A"); |
|||
return entityViewService.saveEntityView(entityView); |
|||
} |
|||
|
|||
protected EntityRelation createRelation(EntityId from, EntityId to) { |
|||
EntityRelation relation = new EntityRelation(); |
|||
relation.setFrom(from); |
|||
relation.setTo(to); |
|||
relation.setType(EntityRelation.MANAGES_TYPE); |
|||
relation.setAdditionalInfo(JacksonUtil.newObjectNode().set("a", new TextNode("b"))); |
|||
relation.setTypeGroup(RelationTypeGroup.COMMON); |
|||
relationService.saveRelation(TenantId.SYS_TENANT_ID, relation); |
|||
return relation; |
|||
} |
|||
|
|||
protected <E extends ExportableEntity<?> & HasTenantId> void checkImportedEntity(TenantId tenantId1, E initialEntity, TenantId tenantId2, E importedEntity) { |
|||
assertThat(initialEntity.getTenantId()).isEqualTo(tenantId1); |
|||
assertThat(importedEntity.getTenantId()).isEqualTo(tenantId2); |
|||
|
|||
assertThat(importedEntity.getExternalId()).isEqualTo(initialEntity.getId()); |
|||
|
|||
boolean sameTenant = tenantId1.equals(tenantId2); |
|||
if (!sameTenant) { |
|||
assertThat(importedEntity.getId()).isNotEqualTo(initialEntity.getId()); |
|||
} else { |
|||
assertThat(importedEntity.getId()).isEqualTo(initialEntity.getId()); |
|||
} |
|||
} |
|||
|
|||
|
|||
protected <E extends ExportableEntity<I>, I extends EntityId> EntityExportData<E> exportEntity(User user, I entityId) throws Exception { |
|||
return exportEntity(user, entityId, EntityExportSettings.builder() |
|||
.exportCredentials(true) |
|||
.build()); |
|||
} |
|||
|
|||
protected <E extends ExportableEntity<I>, I extends EntityId> EntityExportData<E> exportEntity(User user, I entityId, EntityExportSettings exportSettings) throws Exception { |
|||
return exportImportService.exportEntity(new SimpleEntitiesExportCtx(getSecurityUser(user), null, null, exportSettings), entityId); |
|||
} |
|||
|
|||
protected <E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(User user, EntityExportData<E> exportData) throws Exception { |
|||
return importEntity(user, exportData, EntityImportSettings.builder() |
|||
.saveCredentials(true) |
|||
.build()); |
|||
} |
|||
|
|||
protected <E extends ExportableEntity<I>, I extends EntityId> EntityImportResult<E> importEntity(User user, EntityExportData<E> exportData, EntityImportSettings importSettings) throws Exception { |
|||
EntitiesImportCtx ctx = new EntitiesImportCtx(UUID.randomUUID(), getSecurityUser(user), null, importSettings); |
|||
ctx.setFinalImportAttempt(true); |
|||
exportData = JacksonUtil.treeToValue(JacksonUtil.valueToTree(exportData), EntityExportData.class); |
|||
EntityImportResult<E> importResult = exportImportService.importEntity(ctx, exportData); |
|||
exportImportService.saveReferencesAndRelations(ctx); |
|||
for (ThrowingRunnable throwingRunnable : ctx.getEventCallbacks()) { |
|||
throwingRunnable.run(); |
|||
} |
|||
return importResult; |
|||
} |
|||
|
|||
protected SecurityUser getSecurityUser(User user) { |
|||
return new SecurityUser(user, true, new UserPrincipal(UserPrincipal.Type.USER_NAME, user.getEmail())); |
|||
} |
|||
|
|||
} |
|||
File diff suppressed because it is too large
File diff suppressed because one or more lines are too long
@ -0,0 +1,273 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.EnumSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.EntityView; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityViewId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.data.objects.AttributesEntityView; |
|||
import org.thingsboard.server.common.data.objects.TelemetryEntityView; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.dao.entityview.EntityViewService; |
|||
|
|||
import java.time.Instant; |
|||
import java.time.temporal.ChronoUnit; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
|
|||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyList; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.Mockito.doAnswer; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.verifyNoMoreInteractions; |
|||
import static org.mockito.Mockito.when; |
|||
import static org.thingsboard.server.common.data.msg.TbMsgType.ACTIVITY_EVENT; |
|||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; |
|||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; |
|||
import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; |
|||
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class TbCopyAttributesToEntityViewNodeTest { |
|||
|
|||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("9fdb1f05-dc66-4960-9263-ae195f1b4533")); |
|||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("1d453dc9-9333-476a-a51f-093cf2176e59")); |
|||
private final EntityViewId ENTITY_VIEW_ID = new EntityViewId(UUID.fromString("65636806-453d-4bb4-b513-92b833970753")); |
|||
|
|||
private final AttributesEntityView CLIENT_ATTRIBUTES = new AttributesEntityView(List.of("clientAttribute1"), Collections.emptyList(), Collections.emptyList()); |
|||
private final AttributesEntityView SERVER_ATTRIBUTES = new AttributesEntityView(Collections.emptyList(), List.of("serverAttribute1"), Collections.emptyList()); |
|||
private final AttributesEntityView SHARED_ATTRIBUTES = new AttributesEntityView(Collections.emptyList(), Collections.emptyList(), List.of("sharedAttribute1")); |
|||
|
|||
private final TelemetryEntityView CLIENT_TELEMETRY_ENTITY_VIEW = new TelemetryEntityView(Collections.emptyList(), CLIENT_ATTRIBUTES); |
|||
private final TelemetryEntityView SERVER_TELEMETRY_ENTITY_VIEW = new TelemetryEntityView(Collections.emptyList(), SERVER_ATTRIBUTES); |
|||
private final TelemetryEntityView SHARED_TELEMETRY_ENTITY_VIEW = new TelemetryEntityView(Collections.emptyList(), SHARED_ATTRIBUTES); |
|||
|
|||
private final long ENTITY_VIEW_START_TS = Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli(); |
|||
private final long ENTITY_VIEW_END_TS = Instant.now().plus(1, ChronoUnit.DAYS).toEpochMilli(); |
|||
|
|||
private TbCopyAttributesToEntityViewNode node; |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
@Mock |
|||
private EntityViewService entityViewServiceMock; |
|||
@Mock |
|||
private RuleEngineTelemetryService telemetryServiceMock; |
|||
|
|||
@BeforeEach |
|||
void setUp() throws TbNodeException { |
|||
node = new TbCopyAttributesToEntityViewNode(); |
|||
var config = new EmptyNodeConfiguration().defaultConfiguration(); |
|||
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctxMock, configuration); |
|||
} |
|||
|
|||
@Test |
|||
public void givenExistingClientAttributes_whenOnMsg_thenCopyAttributesToView() { |
|||
EntityView entityView = getEntityView(CLIENT_TELEMETRY_ENTITY_VIEW); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST, DEVICE_ID, |
|||
new TbMsgMetaData(Map.of(DataConstants.SCOPE, AttributeScope.SERVER_SCOPE.name())), |
|||
"{\"clientAttribute1\": 100, \"clientAttribute2\": \"value2\"}"); |
|||
|
|||
mockEntityViewLookup(entityView); |
|||
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); |
|||
doAnswer(invocation -> { |
|||
FutureCallback<Void> callback = invocation.getArgument(4); |
|||
callback.onSuccess(null); |
|||
return null; |
|||
}).when(telemetryServiceMock).saveAndNotify(any(), any(), any(AttributeScope.class), anyList(), any(FutureCallback.class)); |
|||
TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId()); |
|||
// TODO: use newMsg() with any(TbMsgType.class), replace in other tests as well.
|
|||
doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any()); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); |
|||
ArgumentCaptor<List<AttributeKvEntry>> filteredAttributesCaptor = ArgumentCaptor.forClass(List.class); |
|||
verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), eq(ENTITY_VIEW_ID), eq(AttributeScope.CLIENT_SCOPE), |
|||
filteredAttributesCaptor.capture(), any(FutureCallback.class)); |
|||
List<AttributeKvEntry> filteredAttributesCaptorValue = filteredAttributesCaptor.getValue(); |
|||
assertThat(filteredAttributesCaptorValue.size()).isEqualTo(1); |
|||
assertThat(filteredAttributesCaptorValue.get(0).getKey()).isEqualTo("clientAttribute1"); |
|||
assertThat(filteredAttributesCaptorValue.get(0).getValue()).isEqualTo(100L); |
|||
verify(ctxMock).ack(eq(msg)); |
|||
verify(ctxMock).enqueueForTellNext(eq(newMsg), eq(TbNodeConnectionType.SUCCESS)); |
|||
verifyNoMoreInteractions(ctxMock, entityViewServiceMock, telemetryServiceMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenExistingServerAttributesAndMsgTypeAttributesDeleted_whenOnMsg_thenDeleteAttributesFromView() { |
|||
EntityView entityView = getEntityView(SERVER_TELEMETRY_ENTITY_VIEW); |
|||
|
|||
TbMsg msg = TbMsg.newMsg( |
|||
ATTRIBUTES_DELETED, DEVICE_ID, new TbMsgMetaData(Map.of(DataConstants.SCOPE, AttributeScope.SERVER_SCOPE.name())), |
|||
"{\"attributes\": [\"serverAttribute1\"]}"); |
|||
|
|||
mockEntityViewLookup(entityView); |
|||
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); |
|||
doAnswer(invocation -> { |
|||
FutureCallback<Void> callback = invocation.getArgument(4); |
|||
callback.onSuccess(null); |
|||
return null; |
|||
}).when(telemetryServiceMock).deleteAndNotify(any(), any(), any(AttributeScope.class), anyList(), any(FutureCallback.class)); |
|||
TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId()); |
|||
doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any()); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); |
|||
ArgumentCaptor<List<String>> filteredAttributesCaptor = ArgumentCaptor.forClass(List.class); |
|||
verify(telemetryServiceMock).deleteAndNotify(eq(TENANT_ID), eq(ENTITY_VIEW_ID), eq(AttributeScope.SERVER_SCOPE), filteredAttributesCaptor.capture(), any(FutureCallback.class)); |
|||
List<String> filteredAttributesCaptorValue = filteredAttributesCaptor.getValue(); |
|||
assertThat(filteredAttributesCaptorValue.size()).isEqualTo(1); |
|||
assertThat(filteredAttributesCaptorValue.get(0)).isEqualTo("serverAttribute1"); |
|||
verify(ctxMock).ack(eq(msg)); |
|||
verify(ctxMock).enqueueForTellNext(eq(newMsg), eq(TbNodeConnectionType.SUCCESS)); |
|||
verifyNoMoreInteractions(ctxMock, entityViewServiceMock, telemetryServiceMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenNonMatchedSharedAttributesAndMsgTypeIsAttributesDeleted_whenOnMsg_thenNoAttributesDeleteFromView() { |
|||
EntityView entityView = getEntityView(SHARED_TELEMETRY_ENTITY_VIEW); |
|||
|
|||
TbMsg msg = TbMsg.newMsg( |
|||
TbMsgType.ATTRIBUTES_DELETED, DEVICE_ID, new TbMsgMetaData(Map.of(DataConstants.SCOPE, AttributeScope.SHARED_SCOPE.name())), |
|||
"{\"attributes\": [\"anotherAttribute\"]}"); |
|||
|
|||
mockEntityViewLookup(entityView); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); |
|||
verify(ctxMock).ack(eq(msg)); |
|||
verifyNoMoreInteractions(ctxMock, entityViewServiceMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenNonMatchedAttributesAndMsgTypeIsPostAttributesRequest_whenOnMsg_thenCopyNoAttributesToView() { |
|||
EntityView entityView = getEntityView(CLIENT_TELEMETRY_ENTITY_VIEW); |
|||
|
|||
TbMsg msg = TbMsg.newMsg( |
|||
TbMsgType.POST_ATTRIBUTES_REQUEST, DEVICE_ID, new TbMsgMetaData(Map.of(DataConstants.SCOPE, AttributeScope.SERVER_SCOPE.name())), |
|||
"{\"clientAttribute2\": \"value2\"}"); |
|||
|
|||
mockEntityViewLookup(entityView); |
|||
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); |
|||
doAnswer(invocation -> { |
|||
FutureCallback<Void> callback = invocation.getArgument(4); |
|||
callback.onSuccess(null); |
|||
return null; |
|||
}).when(telemetryServiceMock).saveAndNotify(any(), any(), any(AttributeScope.class), anyList(), any(FutureCallback.class)); |
|||
TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId()); |
|||
doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any()); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); |
|||
verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), eq(ENTITY_VIEW_ID), eq(AttributeScope.CLIENT_SCOPE), eq(Collections.emptyList()), any(FutureCallback.class)); |
|||
verify(ctxMock).ack(eq(msg)); |
|||
verify(ctxMock).enqueueForTellNext(eq(newMsg), eq(TbNodeConnectionType.SUCCESS)); |
|||
verifyNoMoreInteractions(ctxMock, entityViewServiceMock, telemetryServiceMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenAttributesValidityPeriodOutOfStartDateAndEndDate_whenOnMsg_thenDoNothing() { |
|||
EntityView entityView = getEntityView( |
|||
SERVER_TELEMETRY_ENTITY_VIEW, |
|||
Instant.now().minus(2, ChronoUnit.DAYS).toEpochMilli(), |
|||
Instant.now().minus(1, ChronoUnit.DAYS).toEpochMilli() |
|||
); |
|||
mockEntityViewLookup(entityView); |
|||
|
|||
TbMsg msg = TbMsg.newMsg( |
|||
ATTRIBUTES_DELETED, DEVICE_ID, new TbMsgMetaData(Map.of(DataConstants.SCOPE, AttributeScope.SERVER_SCOPE.name())), |
|||
"{\"attributes\": [\"serverAttribute1\"]}"); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); |
|||
verify(ctxMock).ack(eq(msg)); |
|||
verifyNoMoreInteractions(ctxMock, entityViewServiceMock); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@EnumSource(TbMsgType.class) |
|||
public void givenMsgTypeAndEmptyMetadata_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) { |
|||
TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
verify(ctxMock).tellFailure(eq(msg), throwableCaptor.capture()); |
|||
|
|||
if (msg.isTypeOneOf(ATTRIBUTES_UPDATED, ATTRIBUTES_DELETED, |
|||
ACTIVITY_EVENT, INACTIVITY_EVENT, POST_ATTRIBUTES_REQUEST)) { |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class) |
|||
.hasMessage("Message metadata is empty"); |
|||
return; |
|||
} |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class) |
|||
.hasMessage("Unsupported msg type [" + msgType + "]"); |
|||
|
|||
verifyNoMoreInteractions(ctxMock); |
|||
} |
|||
|
|||
private EntityView getEntityView(TelemetryEntityView attributesEntityView, long startTimeMs, long endTimeMs) { |
|||
EntityView entityView = new EntityView(ENTITY_VIEW_ID); |
|||
entityView.setStartTimeMs(startTimeMs); |
|||
entityView.setEndTimeMs(endTimeMs); |
|||
entityView.setKeys(attributesEntityView); |
|||
return entityView; |
|||
} |
|||
|
|||
private EntityView getEntityView(TelemetryEntityView attributesEntityView) { |
|||
return getEntityView(attributesEntityView, ENTITY_VIEW_START_TS, ENTITY_VIEW_END_TS); |
|||
} |
|||
|
|||
private void mockEntityViewLookup(EntityView entityView) { |
|||
when(ctxMock.getEntityViewService()).thenReturn(entityViewServiceMock); |
|||
when(ctxMock.getTenantId()).thenReturn(TENANT_ID); |
|||
when(entityViewServiceMock.findEntityViewsByTenantIdAndEntityIdAsync(any(), any())) |
|||
.thenReturn(Futures.immediateFuture(List.of(entityView))); |
|||
} |
|||
} |
|||
@ -0,0 +1,155 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.jupiter.api.AfterEach; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.mockito.stubbing.Answer; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.RuleNodeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicBoolean; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyLong; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.given; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.BDDMockito.willAnswer; |
|||
import static org.mockito.Mockito.times; |
|||
|
|||
@Slf4j |
|||
@ExtendWith(MockitoExtension.class) |
|||
public class TbMsgCountNodeTest { |
|||
|
|||
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("ee682a85-7f5a-4182-91bc-46e555138fe2")); |
|||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("1b21c7cc-0c9e-4ab1-b867-99451599e146")); |
|||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("04dfbd38-10e5-47b7-925f-11e795db89e1")); |
|||
|
|||
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("msg-count-node-test"); |
|||
private final TbMsg tickMsg = TbMsg.newMsg(TbMsgType.MSG_COUNT_SELF_MSG, RULE_NODE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); |
|||
|
|||
private ScheduledExecutorService executorService; |
|||
private TbMsgCountNode node; |
|||
private TbMsgCountNodeConfiguration config; |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
|
|||
@BeforeEach |
|||
public void setUp() { |
|||
node = new TbMsgCountNode(); |
|||
config = new TbMsgCountNodeConfiguration().defaultConfiguration(); |
|||
executorService = Executors.newSingleThreadScheduledExecutor(factory); |
|||
} |
|||
|
|||
@AfterEach |
|||
public void tearDown() { |
|||
if (executorService != null) { |
|||
executorService.shutdownNow(); |
|||
} |
|||
node.destroy(); |
|||
} |
|||
|
|||
@Test |
|||
public void verifyDefaultConfig() { |
|||
assertThat(config.getInterval()).isEqualTo(1); |
|||
assertThat(config.getTelemetryPrefix()).isEqualTo("messageCount"); |
|||
} |
|||
|
|||
@Test |
|||
public void givenIncomingMsgs_whenOnMsg_thenSendsMsgWithMsgCount() throws TbNodeException, InterruptedException { |
|||
// GIVEN
|
|||
int msgCount = 100; |
|||
var awaitTellSelfLatch = new CountDownLatch(1); |
|||
var currentMsgNumber = new AtomicInteger(0); |
|||
var msgWithCounterSent = new AtomicBoolean(false); |
|||
|
|||
willAnswer((Answer<Void>) invocationOnMock -> { |
|||
executorService.schedule(() -> { |
|||
TbMsg tickMsg = invocationOnMock.getArgument(0); |
|||
msgWithCounterSent.set(true); |
|||
node.onMsg(ctxMock, tickMsg); |
|||
awaitTellSelfLatch.countDown(); |
|||
}, config.getInterval(), TimeUnit.SECONDS); |
|||
return null; |
|||
}).given(ctxMock).tellSelf(any(TbMsg.class), anyLong()); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
given(ctxMock.getServiceId()).willReturn("tb-rule-engine"); |
|||
given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID); |
|||
given(ctxMock.newMsg(null, TbMsgType.MSG_COUNT_SELF_MSG, RULE_NODE_ID, null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING)).willReturn(tickMsg); |
|||
|
|||
// WHEN
|
|||
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); |
|||
|
|||
var expectedProcessedMsgs = new ArrayList<TbMsg>(); |
|||
for (int i = 0; i < msgCount; i++) { |
|||
var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); |
|||
if (msgWithCounterSent.get()) { |
|||
break; |
|||
} |
|||
node.onMsg(ctxMock, msg); |
|||
expectedProcessedMsgs.add(msg); |
|||
currentMsgNumber.getAndIncrement(); |
|||
} |
|||
|
|||
awaitTellSelfLatch.await(); |
|||
|
|||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
then(ctxMock).should(times(currentMsgNumber.get())).ack(msgCaptor.capture()); |
|||
var actualProcessedMsgs = msgCaptor.getAllValues(); |
|||
assertThat(actualProcessedMsgs).hasSize(expectedProcessedMsgs.size()); |
|||
assertThat(actualProcessedMsgs).isNotEmpty(); |
|||
assertThat(actualProcessedMsgs).containsExactlyInAnyOrderElementsOf(expectedProcessedMsgs); |
|||
|
|||
ArgumentCaptor<TbMsg> msgWithCounterCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
then(ctxMock).should().enqueueForTellNext(msgWithCounterCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); |
|||
TbMsg resultedMsg = msgWithCounterCaptor.getValue(); |
|||
String expectedData = "{\"messageCount_tb-rule-engine\":" + currentMsgNumber + "}"; |
|||
TbMsg expectedMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, expectedData); |
|||
assertThat(resultedMsg).usingRecursiveComparison() |
|||
.ignoringFields("id", "ts", "ctx", "metaData") |
|||
.isEqualTo(expectedMsg); |
|||
Map<String, String> actualMetadata = resultedMsg.getMetaData().getData(); |
|||
assertThat(actualMetadata).hasFieldOrProperty("delta"); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,176 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.aws.sns; |
|||
|
|||
import com.amazonaws.ResponseMetadata; |
|||
import com.amazonaws.services.sns.AmazonSNS; |
|||
import com.amazonaws.services.sns.model.PublishRequest; |
|||
import com.amazonaws.services.sns.model.PublishResult; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.Arguments; |
|||
import org.junit.jupiter.params.provider.MethodSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.common.util.ListeningExecutor; |
|||
import org.thingsboard.rule.engine.TestDbCallbackExecutor; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.Callable; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.given; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.BDDMockito.mock; |
|||
import static org.mockito.BDDMockito.never; |
|||
import static org.mockito.BDDMockito.verifyNoMoreInteractions; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
class TbSnsNodeTest { |
|||
|
|||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("fccfdf2e-6a88-4a94-81dd-5cbb557019cf")); |
|||
private final ListeningExecutor executor = new TestDbCallbackExecutor(); |
|||
|
|||
private TbSnsNode node; |
|||
private TbSnsNodeConfiguration config; |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
@Mock |
|||
private AmazonSNS snsClientMock; |
|||
@Mock |
|||
private PublishResult publishResultMock; |
|||
@Mock |
|||
private ResponseMetadata responseMetadataMock; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
node = new TbSnsNode(); |
|||
config = new TbSnsNodeConfiguration().defaultConfiguration(); |
|||
ReflectionTestUtils.setField(node, "snsClient", snsClientMock); |
|||
ReflectionTestUtils.setField(node, "config", config); |
|||
} |
|||
|
|||
@Test |
|||
void verifyDefaultConfig() { |
|||
assertThat(config.getTopicArnPattern()).isEqualTo("arn:aws:sns:us-east-1:123456789012:MyNewTopic"); |
|||
assertThat(config.getAccessKeyId()).isNull(); |
|||
assertThat(config.getSecretAccessKey()).isNull(); |
|||
assertThat(config.getRegion()).isEqualTo("us-east-1"); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
void givenForceAckIsTrueAndTopicNamePattern_whenOnMsg_thenEnqueueForTellNext(String topicName, TbMsgMetaData metaData, String data) { |
|||
ReflectionTestUtils.setField(node, "forceAck", true); |
|||
config.setAccessKeyId("accessKeyId"); |
|||
config.setSecretAccessKey("secretAccessKey"); |
|||
config.setTopicArnPattern(topicName); |
|||
String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; |
|||
String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; |
|||
|
|||
given(ctxMock.getExternalCallExecutor()).willReturn(executor); |
|||
given(snsClientMock.publish(any(PublishRequest.class))).willReturn(publishResultMock); |
|||
given(publishResultMock.getMessageId()).willReturn(messageId); |
|||
given(publishResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock); |
|||
given(responseMetadataMock.getRequestId()).willReturn(requestId); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
then(ctxMock).should().ack(msg); |
|||
PublishRequest publishRequest = new PublishRequest() |
|||
.withTopicArn(TbNodeUtils.processPattern(topicName, msg)) |
|||
.withMessage(data); |
|||
then(snsClientMock).should().publish(publishRequest); |
|||
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
then(ctxMock).should().enqueueForTellNext(actualMsgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); |
|||
TbMsg actualMsg = actualMsgCaptor.getValue(); |
|||
assertThat(actualMsg) |
|||
.usingRecursiveComparison() |
|||
.ignoringFields("metaData", "ctx") |
|||
.isEqualTo(msg); |
|||
assertThat(actualMsg.getMetaData().getData()) |
|||
.hasFieldOrPropertyWithValue("messageId", messageId) |
|||
.hasFieldOrPropertyWithValue("requestId", requestId); |
|||
verifyNoMoreInteractions(ctxMock, snsClientMock, publishResultMock, responseMetadataMock); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenForceAckIsTrueAndTopicNamePattern_whenOnMsg_thenEnqueueForTellNext() { |
|||
return Stream.of( |
|||
Arguments.of("arn:aws:sns:us-east-1:123456789012:NewTopic", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT), |
|||
Arguments.of("arn:aws:sns:us-east-1:123456789012:$[msgTopicName]", TbMsgMetaData.EMPTY, "{\"msgTopicName\":\"msg-topic-name\"}"), |
|||
Arguments.of("arn:aws:sns:us-east-1:123456789012:${mdTopicName}", new TbMsgMetaData(Map.of("mdTopicName", "md-topic-name")), TbMsg.EMPTY_JSON_OBJECT) |
|||
); |
|||
} |
|||
|
|||
@Test |
|||
void givenForceAckIsFalseAndErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() { |
|||
ReflectionTestUtils.setField(node, "forceAck", false); |
|||
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); |
|||
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); |
|||
String errorMsg = "Something went wrong"; |
|||
ListenableFuture<TbMsg> failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg)); |
|||
given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
then(ctxMock).should(never()).enqueueForTellNext(any(), any(String.class)); |
|||
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), throwableCaptor.capture()); |
|||
TbMsg actualMsg = actualMsgCaptor.getValue(); |
|||
assertThat(actualMsg) |
|||
.usingRecursiveComparison() |
|||
.ignoringFields("metaData", "ctx") |
|||
.isEqualTo(msg); |
|||
assertThat(actualMsg.getMetaData().getData()) |
|||
.hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg); |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); |
|||
verifyNoMoreInteractions(ctxMock, snsClientMock); |
|||
} |
|||
|
|||
@Test |
|||
void givenSnsClientIsNotNull_whenDestroy_thenShutdown() { |
|||
node.destroy(); |
|||
then(snsClientMock).should().shutdown(); |
|||
} |
|||
|
|||
@Test |
|||
void givenSnsClientIsNull_whenDestroy_thenVerifyNoInteractions() { |
|||
ReflectionTestUtils.setField(node, "snsClient", null); |
|||
node.destroy(); |
|||
then(snsClientMock).shouldHaveNoInteractions(); |
|||
} |
|||
} |
|||
@ -0,0 +1,263 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.aws.sqs; |
|||
|
|||
import com.amazonaws.ResponseMetadata; |
|||
import com.amazonaws.services.sqs.AmazonSQS; |
|||
import com.amazonaws.services.sqs.model.MessageAttributeValue; |
|||
import com.amazonaws.services.sqs.model.SendMessageRequest; |
|||
import com.amazonaws.services.sqs.model.SendMessageResult; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.Arguments; |
|||
import org.junit.jupiter.params.provider.MethodSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.common.util.ListeningExecutor; |
|||
import org.thingsboard.rule.engine.TestDbCallbackExecutor; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.rule.engine.aws.sqs.TbSqsNodeConfiguration.QueueType; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.Callable; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.given; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.BDDMockito.mock; |
|||
import static org.mockito.BDDMockito.never; |
|||
import static org.mockito.BDDMockito.verifyNoMoreInteractions; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
class TbSqsNodeTest { |
|||
|
|||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("764de824-929f-4114-95ea-0ea0401ffa3d")); |
|||
private final ListeningExecutor executor = new TestDbCallbackExecutor(); |
|||
|
|||
private final String messageId = "msgId-1d186a16-80c7-44b3-a245-a1fc835f20c7"; |
|||
private final String requestId = "reqId-bef0799b-dde9-4aa0-855b-86bbafaeaf31"; |
|||
|
|||
private TbSqsNode node; |
|||
private TbSqsNodeConfiguration config; |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
@Mock |
|||
private AmazonSQS sqsClientMock; |
|||
@Mock |
|||
private SendMessageResult sendMessageResultMock; |
|||
@Mock |
|||
private ResponseMetadata responseMetadataMock; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
node = new TbSqsNode(); |
|||
config = new TbSqsNodeConfiguration().defaultConfiguration(); |
|||
ReflectionTestUtils.setField(node, "sqsClient", sqsClientMock); |
|||
ReflectionTestUtils.setField(node, "config", config); |
|||
} |
|||
|
|||
@Test |
|||
void verifyDefaultConfig() { |
|||
assertThat(config.getQueueType()).isEqualTo(QueueType.STANDARD); |
|||
assertThat(config.getQueueUrlPattern()).isEqualTo("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue-name"); |
|||
assertThat(config.getDelaySeconds()).isEqualTo(0); |
|||
assertThat(config.getMessageAttributes()).isEqualTo(Collections.emptyMap()); |
|||
assertThat(config.getAccessKeyId()).isNull(); |
|||
assertThat(config.getSecretAccessKey()).isNull(); |
|||
assertThat(config.getRegion()).isEqualTo("us-east-1"); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
void givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest(String queueUrl, TbMsgMetaData metaData, String data) { |
|||
config.setQueueType(QueueType.FIFO); |
|||
config.setQueueUrlPattern(queueUrl); |
|||
|
|||
mockSendingMsgRequest(); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|||
.withQueueUrl(TbNodeUtils.processPattern(queueUrl, msg)) |
|||
.withMessageBody(data) |
|||
.withMessageDeduplicationId(msg.getId().toString()) |
|||
.withMessageGroupId(DEVICE_ID.toString()); |
|||
then(sqsClientMock).should().sendMessage(sendMsgRequest); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenQueueUrlPatternsAndQueueTypeIsFifo_whenOnMsg_thenVerifyRequest() { |
|||
return Stream.of( |
|||
Arguments.of( |
|||
"https://sqs.us-east-1.amazonaws.com/123456789012/new-queue-name", |
|||
TbMsgMetaData.EMPTY, |
|||
TbMsg.EMPTY_JSON_OBJECT), |
|||
Arguments.of( |
|||
"https://sqs.us-east-1.amazonaws.com/123456789012/$[msgQueueName]", |
|||
TbMsgMetaData.EMPTY, |
|||
"{\"msgQueueName\":\"msg-queue-name\"}"), |
|||
Arguments.of( |
|||
"https://sqs.us-east-1.amazonaws.com/123456789012/${mdQueueName}", |
|||
new TbMsgMetaData(Map.of("mdQueueName", "md-queue-name")), |
|||
TbMsg.EMPTY_JSON_OBJECT) |
|||
); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
void givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest(TbMsgMetaData metaData, String data, |
|||
Map<String, String> attributes) { |
|||
config.setMessageAttributes(attributes); |
|||
|
|||
mockSendingMsgRequest(); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); |
|||
this.config.getMessageAttributes().forEach((k, v) -> { |
|||
String name = TbNodeUtils.processPattern(k, msg); |
|||
String val = TbNodeUtils.processPattern(v, msg); |
|||
messageAttributes.put(name, new MessageAttributeValue().withDataType("String").withStringValue(val)); |
|||
}); |
|||
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|||
.withQueueUrl(config.getQueueUrlPattern()) |
|||
.withMessageBody(data) |
|||
.withMessageAttributes(messageAttributes) |
|||
.withDelaySeconds(config.getDelaySeconds()); |
|||
then(sqsClientMock).should().sendMessage(sendMsgRequest); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenMsgAttributesPatternsAndQueueTypeIsStandard_whenOnMsg_thenVerifyRequest() { |
|||
return Stream.of( |
|||
Arguments.of(TbMsgMetaData.EMPTY, |
|||
TbMsg.EMPTY_JSON_OBJECT, |
|||
Map.of("attributeName", "attributeValue")), |
|||
Arguments.of(TbMsgMetaData.EMPTY, |
|||
"{\"msgAttrNamePattern\":\"msgAttrName\",\"msgAttrValuePattern\":\"msgAttrValue\"}", |
|||
Map.of("$[msgAttrNamePattern]", "$[msgAttrValuePattern]")), |
|||
Arguments.of(new TbMsgMetaData(Map.of("mdAttrNamePattern", "mdAttrName", "mdAttrValuePattern", "mdAttrValue")), |
|||
TbMsg.EMPTY_JSON_OBJECT, |
|||
Map.of("${mdAttrNamePattern}", "${mdAttrValuePattern}")) |
|||
); |
|||
} |
|||
|
|||
@Test |
|||
void givenForceAckIsTrueAndMsgResultContainsBodyAndAttributesAndNumber_whenOnMsg_thenEnqueueForTellNext() { |
|||
ReflectionTestUtils.setField(node, "forceAck", true); |
|||
String messageBodyMd5 = "msgBodyMd5-55fb8ba2-2b71-4673-a82a-969756764761"; |
|||
String messageAttributesMd5 = "msgAttrMd5-e3ba3eef-52ae-436a-bec1-0c2c2252d1f1"; |
|||
String sequenceNumber = "seqNum-bb5ddce0-cf4e-4295-b015-524bdb6a332f"; |
|||
|
|||
mockSendingMsgRequest(); |
|||
given(sendMessageResultMock.getMD5OfMessageBody()).willReturn(messageBodyMd5); |
|||
given(sendMessageResultMock.getMD5OfMessageAttributes()).willReturn(messageAttributesMd5); |
|||
given(sendMessageResultMock.getSequenceNumber()).willReturn(sequenceNumber); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
then(ctxMock).should().ack(msg); |
|||
SendMessageRequest sendMsgRequest = new SendMessageRequest() |
|||
.withQueueUrl(TbNodeUtils.processPattern(config.getQueueUrlPattern(), msg)) |
|||
.withMessageBody(msg.getData()) |
|||
.withDelaySeconds(config.getDelaySeconds()); |
|||
then(sqsClientMock).should().sendMessage(sendMsgRequest); |
|||
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
then(ctxMock).should().enqueueForTellNext(actualMsgCaptor.capture(), eq(TbNodeConnectionType.SUCCESS)); |
|||
TbMsg actualMsg = actualMsgCaptor.getValue(); |
|||
assertThat(actualMsg) |
|||
.usingRecursiveComparison() |
|||
.ignoringFields("metaData", "ctx") |
|||
.isEqualTo(msg); |
|||
assertThat(actualMsg.getMetaData().getData()) |
|||
.hasFieldOrPropertyWithValue("messageId", messageId) |
|||
.hasFieldOrPropertyWithValue("requestId", requestId) |
|||
.hasFieldOrPropertyWithValue("messageBodyMd5", messageBodyMd5) |
|||
.hasFieldOrPropertyWithValue("messageAttributesMd5", messageAttributesMd5) |
|||
.hasFieldOrPropertyWithValue("sequenceNumber", sequenceNumber); |
|||
verifyNoMoreInteractions(ctxMock, sqsClientMock, sendMessageResultMock, responseMetadataMock); |
|||
} |
|||
|
|||
@Test |
|||
void givenForceAckIsFalseAndErrorOccursDuringProcessingRequest_whenOnMsg_thenTellFailure() { |
|||
ReflectionTestUtils.setField(node, "forceAck", false); |
|||
ListeningExecutor listeningExecutor = mock(ListeningExecutor.class); |
|||
given(ctxMock.getExternalCallExecutor()).willReturn(listeningExecutor); |
|||
String errorMsg = "Something went wrong"; |
|||
|
|||
ListenableFuture<TbMsg> failedFuture = Futures.immediateFailedFuture(new RuntimeException(errorMsg)); |
|||
given(listeningExecutor.executeAsync(any(Callable.class))).willReturn(failedFuture); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
then(ctxMock).should(never()).enqueueForTellNext(any(), any(String.class)); |
|||
ArgumentCaptor<TbMsg> actualMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
|||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
then(ctxMock).should().tellFailure(actualMsgCaptor.capture(), throwableCaptor.capture()); |
|||
TbMsg actualMsg = actualMsgCaptor.getValue(); |
|||
assertThat(actualMsg) |
|||
.usingRecursiveComparison() |
|||
.ignoringFields("metaData", "ctx") |
|||
.isEqualTo(msg); |
|||
assertThat(actualMsg.getMetaData().getData()) |
|||
.hasFieldOrPropertyWithValue("error", RuntimeException.class + ": " + errorMsg); |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class).hasMessage(errorMsg); |
|||
verifyNoMoreInteractions(ctxMock, sqsClientMock); |
|||
} |
|||
|
|||
@Test |
|||
void givenSqsClientIsNotNull_whenDestroy_thenShutdown() { |
|||
node.destroy(); |
|||
then(sqsClientMock).should().shutdown(); |
|||
} |
|||
|
|||
@Test |
|||
void givenSqsClientIsNull_whenDestroy_thenVerifyNoInteractions() { |
|||
ReflectionTestUtils.setField(node, "sqsClient", null); |
|||
node.destroy(); |
|||
then(sqsClientMock).shouldHaveNoInteractions(); |
|||
} |
|||
|
|||
private void mockSendingMsgRequest() { |
|||
given(ctxMock.getExternalCallExecutor()).willReturn(executor); |
|||
given(sqsClientMock.sendMessage(any(SendMessageRequest.class))).willReturn(sendMessageResultMock); |
|||
given(sendMessageResultMock.getMessageId()).willReturn(messageId); |
|||
given(sendMessageResultMock.getSdkResponseMetadata()).willReturn(responseMetadataMock); |
|||
given(responseMetadataMock.getRequestId()).willReturn(requestId); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,421 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.rpc; |
|||
|
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.Arguments; |
|||
import org.junit.jupiter.params.provider.EnumSource; |
|||
import org.junit.jupiter.params.provider.MethodSource; |
|||
import org.junit.jupiter.params.provider.NullAndEmptySource; |
|||
import org.junit.jupiter.params.provider.ValueSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; |
|||
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; |
|||
import org.thingsboard.rule.engine.api.RuleEngineRpcService; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType; |
|||
import org.thingsboard.server.common.data.rpc.RpcError; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
import java.util.Optional; |
|||
import java.util.Random; |
|||
import java.util.UUID; |
|||
import java.util.function.Consumer; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.BDDMockito.given; |
|||
import static org.mockito.BDDMockito.then; |
|||
import static org.mockito.BDDMockito.willAnswer; |
|||
import static org.mockito.Mockito.mock; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class TbSendRPCRequestNodeTest { |
|||
|
|||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("d3a47f8b-d863-4c1f-b6f0-2c946b43f21c")); |
|||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("b052ae59-b9b4-47e8-ac71-39e7124bbd66")); |
|||
|
|||
private final String MSG_DATA = """ |
|||
{ |
|||
"method": "setGpio", |
|||
"params": { |
|||
"pin": "23", |
|||
"value": 1 |
|||
}, |
|||
"additionalInfo": "information" |
|||
} |
|||
"""; |
|||
|
|||
private TbSendRPCRequestNode node; |
|||
private TbSendRpcRequestNodeConfiguration config; |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
@Mock |
|||
private RuleEngineRpcService rpcServiceMock; |
|||
|
|||
@BeforeEach |
|||
public void setUp() throws TbNodeException { |
|||
node = new TbSendRPCRequestNode(); |
|||
config = new TbSendRpcRequestNodeConfiguration().defaultConfiguration(); |
|||
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
node.init(ctxMock, configuration); |
|||
} |
|||
|
|||
@Test |
|||
public void verifyDefaultConfig() { |
|||
assertThat(config.getTimeoutInSeconds()).isEqualTo(60); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
public void givenOneway_whenOnMsg_thenVerifyRequest(String mdKeyValue, boolean expectedResult) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsgMetaData msgMetadata = new TbMsgMetaData(); |
|||
msgMetadata.putValue("oneway", mdKeyValue); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, msgMetadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
var ruleEngineDeviceRpcRequestCaptor = captureRequest(); |
|||
assertThat(ruleEngineDeviceRpcRequestCaptor.getValue().isOneway()).isEqualTo(expectedResult); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenOneway_whenOnMsg_thenVerifyRequest() { |
|||
return Stream.of( |
|||
Arguments.of("true", true), |
|||
Arguments.of("false", false), |
|||
Arguments.of(null, false), |
|||
Arguments.of("", false) |
|||
); |
|||
} |
|||
|
|||
@Test |
|||
public void givenMsgBody_whenOnMsg_thenVerifyRequest() { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = ArgumentCaptor.forClass(RuleEngineDeviceRpcRequest.class); |
|||
then(rpcServiceMock).should().sendRpcRequestToDevice(requestCaptor.capture(), any(Consumer.class)); |
|||
assertThat(requestCaptor.getValue()) |
|||
.hasFieldOrPropertyWithValue("method", "setGpio") |
|||
.hasFieldOrPropertyWithValue("body", "{\"pin\":\"23\",\"value\":1}") |
|||
.hasFieldOrPropertyWithValue("deviceId", DEVICE_ID) |
|||
.hasFieldOrPropertyWithValue("tenantId", TENANT_ID) |
|||
.hasFieldOrPropertyWithValue("additionalInfo", "information"); |
|||
} |
|||
|
|||
@Test |
|||
public void givenRequestIdIsNotSet_whenOnMsg_thenVerifyRequest() { |
|||
Random randomMock = mock(Random.class); |
|||
given(randomMock.nextInt()).willReturn(123); |
|||
ReflectionTestUtils.setField(node, "random", randomMock); |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getRequestId()).isEqualTo(123); |
|||
} |
|||
|
|||
@Test |
|||
public void givenRequestId_whenOnMsg_thenVerifyRequest() { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
String data = """ |
|||
{ |
|||
"method": "setGpio", |
|||
"params": { |
|||
"pin": "23", |
|||
"value": 1 |
|||
}, |
|||
"requestId": 12345 |
|||
} |
|||
"""; |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getRequestId()).isEqualTo(12345); |
|||
} |
|||
|
|||
@Test |
|||
public void givenRequestUUID_whenOnMsg_thenVerifyRequest() { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
String requestUUID = "b795a241-5a30-48fb-92d5-46b864d47130"; |
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue("requestUUID", requestUUID); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getRequestUUID()).isEqualTo(UUID.fromString(requestUUID)); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@NullAndEmptySource |
|||
public void givenInvalidRequestUUID_whenOnMsg_thenVerifyRequest(String requestUUID) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue("requestUUID", requestUUID); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getRequestUUID()).isNotNull(); |
|||
} |
|||
|
|||
@Test |
|||
public void givenOriginServiceId_whenOnMsg_thenVerifyRequest() { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
String originServiceId = "service-id-123"; |
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue("originServiceId", originServiceId); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getOriginServiceId()).isEqualTo(originServiceId); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@NullAndEmptySource |
|||
public void givenInvalidOriginServiceId_whenOnMsg_thenVerifyRequest(String originServiceId) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue("originServiceId", originServiceId); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getOriginServiceId()).isNull(); |
|||
} |
|||
|
|||
@Test |
|||
public void givenExpirationTime_whenOnMsg_thenVerifyRequest() { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
String expirationTime = "2000000000000"; |
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue(DataConstants.EXPIRATION_TIME, expirationTime); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getExpirationTime()).isEqualTo(Long.parseLong(expirationTime)); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@NullAndEmptySource |
|||
public void givenInvalidExpirationTime_whenOnMsg_thenVerifyRequest(String expirationTime) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue(DataConstants.EXPIRATION_TIME, expirationTime); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getExpirationTime()).isGreaterThan(System.currentTimeMillis()); |
|||
} |
|||
|
|||
@Test |
|||
public void givenRetries_whenOnMsg_thenVerifyRequest() { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
Integer retries = 3; |
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue(DataConstants.RETRIES, String.valueOf(retries)); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getRetries()).isEqualTo(retries); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@NullAndEmptySource |
|||
public void givenInvalidRetriesValue_whenOnMsg_thenVerifyRequest(String retries) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue(DataConstants.RETRIES, retries); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().getRetries()).isNull(); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@EnumSource(TbMsgType.class) |
|||
public void givenTbMsgType_whenOnMsg_thenVerifyRequest(TbMsgType msgType) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
if (msgType == TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE) { |
|||
assertThat(requestCaptor.getValue().isRestApiCall()).isTrue(); |
|||
return; |
|||
} |
|||
assertThat(requestCaptor.getValue().isRestApiCall()).isFalse(); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
public void givenPersistent_whenOnMsg_thenVerifyRequest(String isPersisted, boolean expectedPersistence) { |
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
|
|||
TbMsgMetaData metadata = new TbMsgMetaData(); |
|||
metadata.putValue(DataConstants.PERSISTENT, isPersisted); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, metadata, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = captureRequest(); |
|||
assertThat(requestCaptor.getValue().isPersisted()).isEqualTo(expectedPersistence); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenPersistent_whenOnMsg_thenVerifyRequest() { |
|||
return Stream.of( |
|||
Arguments.of("true", true), |
|||
Arguments.of("false", false), |
|||
Arguments.of(null, false), |
|||
Arguments.of("", false) |
|||
); |
|||
} |
|||
|
|||
private ArgumentCaptor<RuleEngineDeviceRpcRequest> captureRequest() { |
|||
ArgumentCaptor<RuleEngineDeviceRpcRequest> requestCaptor = ArgumentCaptor.forClass(RuleEngineDeviceRpcRequest.class); |
|||
then(rpcServiceMock).should().sendRpcRequestToDevice(requestCaptor.capture(), any(Consumer.class)); |
|||
return requestCaptor; |
|||
} |
|||
|
|||
@Test |
|||
public void givenRpcResponseWithoutError_whenOnMsg_thenSendsRpcRequest() { |
|||
TbMsg outMsg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
|
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
// TODO: replace deprecated method newMsg()
|
|||
given(ctxMock.newMsg(any(), any(String.class), any(), any(), any(), any())).willReturn(outMsg); |
|||
willAnswer(invocation -> { |
|||
Consumer<RuleEngineDeviceRpcResponse> consumer = invocation.getArgument(1); |
|||
RuleEngineDeviceRpcResponse rpcResponseMock = mock(RuleEngineDeviceRpcResponse.class); |
|||
given(rpcResponseMock.getError()).willReturn(Optional.empty()); |
|||
given(rpcResponseMock.getResponse()).willReturn(Optional.of(TbMsg.EMPTY_JSON_OBJECT)); |
|||
consumer.accept(rpcResponseMock); |
|||
return null; |
|||
}).given(rpcServiceMock).sendRpcRequestToDevice(any(RuleEngineDeviceRpcRequest.class), any(Consumer.class)); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
then(ctxMock).should().enqueueForTellNext(outMsg, TbNodeConnectionType.SUCCESS); |
|||
then(ctxMock).should().ack(msg); |
|||
} |
|||
|
|||
@Test |
|||
public void givenRpcResponseWithError_whenOnMsg_thenTellFailure() { |
|||
TbMsg outMsg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
|
|||
given(ctxMock.getRpcService()).willReturn(rpcServiceMock); |
|||
given(ctxMock.getTenantId()).willReturn(TENANT_ID); |
|||
// TODO: replace deprecated method newMsg()
|
|||
given(ctxMock.newMsg(any(), any(String.class), any(), any(), any(), any())).willReturn(outMsg); |
|||
willAnswer(invocation -> { |
|||
Consumer<RuleEngineDeviceRpcResponse> consumer = invocation.getArgument(1); |
|||
RuleEngineDeviceRpcResponse rpcResponseMock = mock(RuleEngineDeviceRpcResponse.class); |
|||
given(rpcResponseMock.getError()).willReturn(Optional.of(RpcError.NO_ACTIVE_CONNECTION)); |
|||
consumer.accept(rpcResponseMock); |
|||
return null; |
|||
}).given(rpcServiceMock).sendRpcRequestToDevice(any(RuleEngineDeviceRpcRequest.class), any(Consumer.class)); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.RPC_CALL_FROM_SERVER_TO_DEVICE, DEVICE_ID, TbMsgMetaData.EMPTY, MSG_DATA); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
then(ctxMock).should().enqueueForTellFailure(outMsg, RpcError.NO_ACTIVE_CONNECTION.name()); |
|||
then(ctxMock).should().ack(msg); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@EnumSource(EntityType.class) |
|||
public void givenOriginatorIsNotDevice_whenOnMsg_thenThrowsException(EntityType entityType) { |
|||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, "ac21a1bb-eabf-4463-8313-24bea1f498d9"); |
|||
|
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, entityId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
then(ctxMock).should().tellFailure(eq(msg), throwableCaptor.capture()); |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class) |
|||
.hasMessage(EntityType.DEVICE != entityType ? "Message originator is not a device entity!" |
|||
: "Method is not present in the message!"); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@ValueSource(strings = {"method", "params"}) |
|||
public void givenMethodOrParamsAreNotPresent_whenOnMsg_thenThrowsException(String key) { |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, "{\"" + key + "\": \"value\"}"); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
then(ctxMock).should().tellFailure(eq(msg), throwableCaptor.capture()); |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(RuntimeException.class) |
|||
.hasMessage(key.equals("method") ? "Params are not present in the message!" : "Method is not present in the message!"); |
|||
} |
|||
} |
|||
@ -0,0 +1,249 @@ |
|||
/** |
|||
* Copyright © 2016-2024 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.telemetry; |
|||
|
|||
import com.google.gson.JsonParser; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.junit.jupiter.params.ParameterizedTest; |
|||
import org.junit.jupiter.params.provider.Arguments; |
|||
import org.junit.jupiter.params.provider.EnumSource; |
|||
import org.junit.jupiter.params.provider.MethodSource; |
|||
import org.mockito.ArgumentCaptor; |
|||
import org.mockito.Mock; |
|||
import org.mockito.junit.jupiter.MockitoExtension; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.adaptor.JsonConverter; |
|||
import org.thingsboard.server.common.data.TenantProfile; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.TenantProfileId; |
|||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|||
import org.thingsboard.server.common.data.kv.KvEntry; |
|||
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; |
|||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.stream.Stream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyList; |
|||
import static org.mockito.ArgumentMatchers.anyLong; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.ArgumentMatchers.isNull; |
|||
import static org.mockito.Mockito.doAnswer; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.verifyNoMoreInteractions; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
@ExtendWith(MockitoExtension.class) |
|||
public class TbMsgTimeseriesNodeTest { |
|||
|
|||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); |
|||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); |
|||
private final TenantProfileId TENANT_PROFILE_ID = new TenantProfileId(UUID.fromString("ab78dd78-83d0-43fa-869f-d42ec9ed1744")); |
|||
|
|||
private TbMsgTimeseriesNode node; |
|||
private TbMsgTimeseriesNodeConfiguration config; |
|||
private long tenantProfileDefaultStorageTtl; |
|||
|
|||
@Mock |
|||
private TbContext ctxMock; |
|||
@Mock |
|||
private RuleEngineTelemetryService telemetryServiceMock; |
|||
|
|||
@BeforeEach |
|||
public void setUp() throws TbNodeException { |
|||
node = new TbMsgTimeseriesNode(); |
|||
config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); |
|||
} |
|||
|
|||
@Test |
|||
public void verifyDefaultConfig() { |
|||
assertThat(config.getDefaultTTL()).isEqualTo(0L); |
|||
assertThat(config.isSkipLatestPersistence()).isFalse(); |
|||
assertThat(config.isUseServerTs()).isFalse(); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@EnumSource(TbMsgType.class) |
|||
public void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) throws TbNodeException { |
|||
init(); |
|||
TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); |
|||
verify(ctxMock).tellFailure(eq(msg), throwableCaptor.capture()); |
|||
|
|||
if (TbMsgType.POST_TELEMETRY_REQUEST.equals(msgType)) { |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Msg body is empty: " + msg.getData()); |
|||
verifyNoMoreInteractions(ctxMock); |
|||
return; |
|||
} |
|||
assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Unsupported msg type: " + msgType); |
|||
verifyNoMoreInteractions(ctxMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenTtlFromConfigIsZeroAndUseServiceTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException { |
|||
config.setUseServerTs(true); |
|||
init(); |
|||
|
|||
String data = """ |
|||
{ |
|||
"temp": 45, |
|||
"humidity": 77 |
|||
} |
|||
"""; |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); |
|||
|
|||
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); |
|||
when(ctxMock.getTenantId()).thenReturn(TENANT_ID); |
|||
doAnswer(invocation -> { |
|||
TelemetryNodeCallback callback = invocation.getArgument(5); |
|||
callback.onSuccess(null); |
|||
return null; |
|||
}).when(telemetryServiceMock).saveAndNotify(any(), any(), any(), anyList(), anyLong(), any()); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
List<TsKvEntry> expectedList = getTsKvEntriesListWithTs(data, System.currentTimeMillis()); |
|||
ArgumentCaptor<List<TsKvEntry>> entryListCaptor = ArgumentCaptor.forClass(List.class); |
|||
verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), |
|||
eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); |
|||
assertThat(entryListCaptor.getValue()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts") |
|||
.containsExactlyElementsOf(expectedList); |
|||
verify(ctxMock).tellSuccess(msg); |
|||
verifyNoMoreInteractions(ctxMock, telemetryServiceMock); |
|||
} |
|||
|
|||
@Test |
|||
public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { |
|||
long ttlFromConfig = 5L; |
|||
config.setDefaultTTL(ttlFromConfig); |
|||
config.setSkipLatestPersistence(true); |
|||
init(); |
|||
|
|||
String data = """ |
|||
{ |
|||
"temp": 45, |
|||
"humidity": 77 |
|||
} |
|||
"""; |
|||
long ts = System.currentTimeMillis(); |
|||
var metadata = Map.of("ts", String.valueOf(ts)); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data); |
|||
|
|||
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); |
|||
when(ctxMock.getTenantId()).thenReturn(TENANT_ID); |
|||
doAnswer(invocation -> { |
|||
TelemetryNodeCallback callback = invocation.getArgument(5); |
|||
callback.onSuccess(null); |
|||
return null; |
|||
}).when(telemetryServiceMock).saveWithoutLatestAndNotify(any(), any(), any(), anyList(), anyLong(), any()); |
|||
|
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
List<TsKvEntry> expectedList = getTsKvEntriesListWithTs(data, ts); |
|||
ArgumentCaptor<List<TsKvEntry>> entryListCaptor = ArgumentCaptor.forClass(List.class); |
|||
verify(telemetryServiceMock).saveWithoutLatestAndNotify( |
|||
eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(ttlFromConfig), any(TelemetryNodeCallback.class)); |
|||
assertThat(entryListCaptor.getValue()).containsExactlyElementsOf(expectedList); |
|||
verify(ctxMock).tellSuccess(msg); |
|||
verifyNoMoreInteractions(ctxMock, telemetryServiceMock); |
|||
} |
|||
|
|||
@ParameterizedTest |
|||
@MethodSource |
|||
public void givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl(String ttlFromMd, long ttlFromConfig, long expectedTtl) throws TbNodeException { |
|||
config.setDefaultTTL(ttlFromConfig); |
|||
init(); |
|||
|
|||
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); |
|||
when(ctxMock.getTenantId()).thenReturn(TENANT_ID); |
|||
|
|||
String data = """ |
|||
{ |
|||
"temp": 45, |
|||
"humidity": 77 |
|||
} |
|||
"""; |
|||
var metadata = new TbMsgMetaData(); |
|||
metadata.putValue("TTL", ttlFromMd); |
|||
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data); |
|||
node.onMsg(ctxMock, msg); |
|||
|
|||
verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), isNull(), eq(DEVICE_ID), anyList(), eq(expectedTtl), any(TelemetryNodeCallback.class)); |
|||
} |
|||
|
|||
private static Stream<Arguments> givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl() { |
|||
return Stream.of( |
|||
// when ttl is present in metadata and it is not zero then ttl = ttl from metadata
|
|||
Arguments.of("1", 2L, 1L), |
|||
// when ttl is absent in metadata and present in config and it is not zero then ttl = ttl from config
|
|||
Arguments.of("", 3L, 3L), |
|||
Arguments.of(null, 4L, 4L), |
|||
// when ttl is present in metadata or config but it is zero then ttl = default ttl from tenant profile
|
|||
Arguments.of("0", 0L, TimeUnit.DAYS.toSeconds(5L)) |
|||
); |
|||
} |
|||
|
|||
private void init() throws TbNodeException { |
|||
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); |
|||
var tenantProfile = getTenantProfile(); |
|||
when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); |
|||
tenantProfile.getProfileConfiguration().ifPresent(profileConfiguration -> |
|||
tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(profileConfiguration.getDefaultStorageTtlDays())); |
|||
node.init(ctxMock, configuration); |
|||
verify(ctxMock).addTenantProfileListener(any()); |
|||
} |
|||
|
|||
private TenantProfile getTenantProfile() { |
|||
var tenantProfile = new TenantProfile(TENANT_PROFILE_ID); |
|||
var tenantProfileData = new TenantProfileData(); |
|||
var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); |
|||
tenantProfileConfiguration.setDefaultStorageTtlDays(5); |
|||
tenantProfileData.setConfiguration(tenantProfileConfiguration); |
|||
tenantProfile.setProfileData(tenantProfileData); |
|||
return tenantProfile; |
|||
} |
|||
|
|||
private static List<TsKvEntry> getTsKvEntriesListWithTs(String data, long ts) { |
|||
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(data), ts); |
|||
List<TsKvEntry> expectedList = new ArrayList<>(); |
|||
for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) { |
|||
for (KvEntry kvEntry : tsKvEntry.getValue()) { |
|||
expectedList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); |
|||
} |
|||
} |
|||
return expectedList; |
|||
} |
|||
|
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue