From 7a4afda4ebb74f8e49a40285fb8e8f8c6e9c0146 Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Fri, 28 Nov 2025 13:48:54 +0200 Subject: [PATCH 1/7] Handle dashboard removal from edge event in DashboardEdgeProcessor --- .../edge/rpc/processor/BaseEdgeProcessor.java | 26 +++++++++++++++++++ .../dashboard/BaseDashboardProcessor.java | 14 ++++++++++ .../dashboard/DashboardEdgeProcessor.java | 17 +++--------- .../server/edge/DashboardEdgeTest.java | 23 ++++++++-------- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index d6bfeae0b7..b216998dee 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -19,12 +19,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.HasCustomerId; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -37,6 +39,7 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.HasId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; @@ -348,6 +351,29 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { edgeCtx.getRelationService().saveRelation(tenantId, relation); } + protected > void pushEntityEventToRuleEngine(TenantId tenantId, T entity, TbMsgType msgType) { + pushEntityEventToRuleEngine(tenantId, null, entity, msgType); + } + + protected > void pushEntityEventToRuleEngine(TenantId tenantId, Edge edge, T entity, TbMsgType msgType) { + try { + String entityAsString = JacksonUtil.toString(entity); + CustomerId customerId = getCustomerId(entity); + TbMsgMetaData tbMsgMetaData = edge == null ? new TbMsgMetaData() : getEdgeActionTbMsgMetaData(edge, customerId); + + pushEntityEventToRuleEngine(tenantId, entity.getId(), customerId, msgType, entityAsString, tbMsgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push entity of type {} action to rule engine: {}", tenantId, entity.getId(), entity.getId().getEntityType(), msgType.name(), e); + } + } + + private > CustomerId getCustomerId(T entity) { + if (entity instanceof HasCustomerId hasCustomer) { + return hasCustomer.getCustomerId(); + } + return null; + } + protected TbMsgMetaData getEdgeActionTbMsgMetaData(Edge edge, CustomerId customerId) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("edgeId", edge.getId().toString()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java index 56efb9157e..52b33e5297 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java @@ -20,9 +20,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.ShortCustomerInfo; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @@ -100,6 +102,18 @@ public abstract class BaseDashboardProcessor extends BaseEdgeProcessor { } } + protected void deleteDashboard(TenantId tenantId, DashboardId dashboardId) { + deleteDashboard(tenantId, null, dashboardId); + } + + protected void deleteDashboard(TenantId tenantId, Edge edge, DashboardId dashboardId) { + Dashboard dashboardById = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId); + if (dashboardById != null) { + edgeCtx.getDashboardService().deleteDashboard(tenantId, dashboardId); + pushEntityEventToRuleEngine(tenantId, edge, dashboardById, TbMsgType.ENTITY_DELETED); + } + } + protected abstract Set filterNonExistingCustomers(TenantId tenantId, Set currentAssignedCustomers, Set newAssignedCustomers); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index e1259a7e0e..522c2ba477 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.ShortCustomerInfo; @@ -29,7 +28,6 @@ import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; @@ -59,10 +57,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor implements Da saveOrUpdateDashboard(tenantId, dashboardId, dashboardUpdateMsg, edge); return Futures.immediateFuture(null); case ENTITY_DELETED_RPC_MESSAGE: - Dashboard dashboardToDelete = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId); - if (dashboardToDelete != null) { - edgeCtx.getDashboardService().unassignDashboardFromEdge(tenantId, dashboardId, edge.getId()); - } + deleteDashboard(tenantId, edge, dashboardId); return Futures.immediateFuture(null); case UNRECOGNIZED: default: @@ -90,14 +85,8 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor implements Da } private void pushDashboardCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DashboardId dashboardId) { - try { - Dashboard dashboard = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId); - String dashboardAsString = JacksonUtil.toString(dashboard); - TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null); - pushEntityEventToRuleEngine(tenantId, dashboardId, null, TbMsgType.ENTITY_CREATED, dashboardAsString, msgMetaData); - } catch (Exception e) { - log.warn("[{}][{}] Failed to push dashboard action to rule engine: {}", tenantId, dashboardId, TbMsgType.ENTITY_CREATED.name(), e); - } + Dashboard dashboard = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId); + pushEntityEventToRuleEngine(tenantId, edge, dashboard, TbMsgType.ENTITY_CREATED); } @Override diff --git a/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java index bbf3d17f0d..8150456efb 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java @@ -36,10 +36,11 @@ import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; -import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @DaoSqlTest @@ -227,7 +228,7 @@ public class DashboardEdgeTest extends AbstractEdgeTest { } @Test - public void testSendDeleteEntityViewOnEdgeToCloud() throws Exception { + public void testSendDeleteDashboardOnEdgeToCloud() throws Exception { Dashboard savedDashboard = saveDashboardOnCloudAndVerifyDeliveryToEdge(); UplinkMsg.Builder upLinkMsgBuilder = UplinkMsg.newBuilder(); @@ -244,12 +245,10 @@ public class DashboardEdgeTest extends AbstractEdgeTest { edgeImitator.expectResponsesAmount(1); edgeImitator.sendUplinkMsg(upLinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); - DashboardInfo dashboardInfo = doGet("/api/dashboard/info/" + savedDashboard.getUuidId(), DashboardInfo.class); - Assert.assertNotNull(dashboardInfo); - List edgeAssets = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/dashboards?", - new TypeReference>() { - }, new PageLink(100)).getData(); - Assert.assertFalse(edgeAssets.contains(dashboardInfo)); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + doGet("/api/dashboard/info/" + savedDashboard.getUuidId(), DashboardInfo.class, status().isNotFound()) + ); } private Dashboard saveDashboardOnCloudAndVerifyDeliveryToEdge() throws Exception { @@ -263,10 +262,10 @@ public class DashboardEdgeTest extends AbstractEdgeTest { Assert.assertTrue(edgeImitator.waitForMessages()); Optional dashboardUpdateMsgOpt = edgeImitator.findMessageByType(DashboardUpdateMsg.class); Assert.assertTrue(dashboardUpdateMsgOpt.isPresent()); - DashboardUpdateMsg entityViewUpdateMsg = dashboardUpdateMsgOpt.get(); - Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, entityViewUpdateMsg.getMsgType()); - Assert.assertEquals(savedDashboard.getUuidId().getMostSignificantBits(), entityViewUpdateMsg.getIdMSB()); - Assert.assertEquals(savedDashboard.getUuidId().getLeastSignificantBits(), entityViewUpdateMsg.getIdLSB()); + DashboardUpdateMsg dashboardUpdateMsg = dashboardUpdateMsgOpt.get(); + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, dashboardUpdateMsg.getMsgType()); + Assert.assertEquals(savedDashboard.getUuidId().getMostSignificantBits(), dashboardUpdateMsg.getIdMSB()); + Assert.assertEquals(savedDashboard.getUuidId().getLeastSignificantBits(), dashboardUpdateMsg.getIdLSB()); return savedDashboard; } From 14171e30a1fa648324bc3ef0c69456afe53f2291 Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Fri, 28 Nov 2025 14:15:19 +0200 Subject: [PATCH 2/7] Handle asset removal from edge event in AssetEdgeProcessor --- .../edge/rpc/processor/BaseEdgeProcessor.java | 2 +- .../processor/asset/AssetEdgeProcessor.java | 5 +---- .../processor/asset/BaseAssetProcessor.java | 18 ++++++++++++++++++ .../thingsboard/server/edge/AssetEdgeTest.java | 16 ++++++---------- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index b216998dee..b131450132 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -363,7 +363,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { pushEntityEventToRuleEngine(tenantId, entity.getId(), customerId, msgType, entityAsString, tbMsgMetaData); } catch (Exception e) { - log.warn("[{}][{}] Failed to push entity of type {} action to rule engine: {}", tenantId, entity.getId(), entity.getId().getEntityType(), msgType.name(), e); + log.warn("[{}][{}] Failed to push entity action for {} to rule engine: {}", tenantId, entity.getId(), entity.getId().getEntityType(), msgType.name(), e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index cba2b62af1..573315c004 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -61,10 +61,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor implements AssetProce saveOrUpdateAsset(tenantId, assetId, assetUpdateMsg, edge); return Futures.immediateFuture(null); case ENTITY_DELETED_RPC_MESSAGE: - Asset assetToDelete = edgeCtx.getAssetService().findAssetById(tenantId, assetId); - if (assetToDelete != null) { - edgeCtx.getAssetService().unassignAssetFromEdge(tenantId, assetId, edge.getId()); - } + deleteAsset(tenantId, edge, assetId); return Futures.immediateFuture(null); case UNRECOGNIZED: default: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java index 8a8d89e52b..1bcbcbf11f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java @@ -21,9 +21,11 @@ import org.springframework.data.util.Pair; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @@ -77,4 +79,20 @@ public abstract class BaseAssetProcessor extends BaseEdgeProcessor { protected abstract void setCustomerId(TenantId tenantId, CustomerId customerId, Asset asset, AssetUpdateMsg assetUpdateMsg); + protected void deleteAsset(TenantId tenantId, AssetId assetId) { + Asset assetById = edgeCtx.getAssetService().findAssetById(tenantId, assetId); + if (assetById != null) { + edgeCtx.getAssetService().deleteAsset(tenantId, assetId); + pushEntityEventToRuleEngine(tenantId, null, assetById, TbMsgType.ENTITY_DELETED); + } + } + + protected void deleteAsset(TenantId tenantId, Edge edge, AssetId assetId) { + Asset assetById = edgeCtx.getAssetService().findAssetById(tenantId, assetId); + if (assetById != null) { + edgeCtx.getAssetService().deleteAsset(tenantId, assetId); + pushEntityEventToRuleEngine(tenantId, edge, assetById, TbMsgType.ENTITY_DELETED); + } + } + } diff --git a/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java index 4d9840b936..66e59fbc73 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.edge; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; @@ -28,8 +27,6 @@ import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -37,10 +34,11 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg; -import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @DaoSqlTest @@ -277,12 +275,10 @@ public class AssetEdgeTest extends AbstractEdgeTest { edgeImitator.expectResponsesAmount(1); edgeImitator.sendUplinkMsg(upLinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); - AssetInfo assetInfo = doGet("/api/asset/info/" + savedAsset.getUuidId(), AssetInfo.class); - Assert.assertNotNull(assetInfo); - List edgeAssets = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/assets?", - new TypeReference>() { - }, new PageLink(100)).getData(); - Assert.assertFalse(edgeAssets.contains(assetInfo)); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + doGet("/api/asset/info/" + savedAsset.getUuidId(), AssetInfo.class, status().isNotFound()) + ); } private Asset saveAssetOnCloudAndVerifyDeliveryToEdge() throws Exception { From 2054b1f5f9128440b52c09962430acaae2e15c76 Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Fri, 28 Nov 2025 14:17:58 +0200 Subject: [PATCH 3/7] Refactor and clean up AssetEdgeProcessor --- .../processor/asset/AssetEdgeProcessor.java | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index 573315c004..a4b35ab541 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -55,18 +55,17 @@ public class AssetEdgeProcessor extends BaseAssetProcessor implements AssetProce try { edgeSynchronizationManager.getEdgeId().set(edge.getId()); - switch (assetUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - case ENTITY_UPDATED_RPC_MESSAGE: + return switch (assetUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE, ENTITY_UPDATED_RPC_MESSAGE -> { saveOrUpdateAsset(tenantId, assetId, assetUpdateMsg, edge); - return Futures.immediateFuture(null); - case ENTITY_DELETED_RPC_MESSAGE: + yield Futures.immediateFuture(null); + } + case ENTITY_DELETED_RPC_MESSAGE -> { deleteAsset(tenantId, edge, assetId); - return Futures.immediateFuture(null); - case UNRECOGNIZED: - default: - return handleUnsupportedMsgType(assetUpdateMsg.getMsgType()); - } + yield Futures.immediateFuture(null); + } + default -> handleUnsupportedMsgType(assetUpdateMsg.getMsgType()); + }; } catch (DataValidationException e) { if (e.getMessage().contains("limit reached")) { log.warn("[{}] Number of allowed asset violated {}", tenantId, assetUpdateMsg, e); @@ -94,14 +93,8 @@ public class AssetEdgeProcessor extends BaseAssetProcessor implements AssetProce } private void pushAssetCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AssetId assetId) { - try { - Asset asset = edgeCtx.getAssetService().findAssetById(tenantId, assetId); - String assetAsString = JacksonUtil.toString(asset); - TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, asset.getCustomerId()); - pushEntityEventToRuleEngine(tenantId, assetId, asset.getCustomerId(), TbMsgType.ENTITY_CREATED, assetAsString, msgMetaData); - } catch (Exception e) { - log.warn("[{}][{}] Failed to push asset action to rule engine: {}", tenantId, assetId, TbMsgType.ENTITY_CREATED.name(), e); - } + Asset asset = edgeCtx.getAssetService().findAssetById(tenantId, assetId); + pushEntityEventToRuleEngine(tenantId, edge, asset, TbMsgType.ENTITY_CREATED); } @Override From 252d8b6174539ebfdac012534662550c913e4b31 Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Fri, 28 Nov 2025 14:47:23 +0200 Subject: [PATCH 4/7] Handle device removal from edge event in DeviceEdgeProcessor --- .../rpc/processor/asset/BaseAssetProcessor.java | 6 +----- .../rpc/processor/device/BaseDeviceProcessor.java | 13 +++++++++++++ .../rpc/processor/device/DeviceEdgeProcessor.java | 15 +++------------ .../thingsboard/server/edge/DeviceEdgeTest.java | 11 +++++------ 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java index 1bcbcbf11f..b6b4dbed5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/BaseAssetProcessor.java @@ -80,11 +80,7 @@ public abstract class BaseAssetProcessor extends BaseEdgeProcessor { protected abstract void setCustomerId(TenantId tenantId, CustomerId customerId, Asset asset, AssetUpdateMsg assetUpdateMsg); protected void deleteAsset(TenantId tenantId, AssetId assetId) { - Asset assetById = edgeCtx.getAssetService().findAssetById(tenantId, assetId); - if (assetById != null) { - edgeCtx.getAssetService().deleteAsset(tenantId, assetId); - pushEntityEventToRuleEngine(tenantId, null, assetById, TbMsgType.ENTITY_DELETED); - } + deleteAsset(tenantId, null, assetId); } protected void deleteAsset(TenantId tenantId, Edge edge, AssetId assetId) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java index 9ab1cf6dd9..3f516de44b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java @@ -21,9 +21,11 @@ import org.springframework.data.util.Pair; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; @@ -110,4 +112,15 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor { protected abstract void setCustomerId(TenantId tenantId, CustomerId customerId, Device device, DeviceUpdateMsg deviceUpdateMsg); + protected void deleteDevice(TenantId tenantId, DeviceId deviceId) { + deleteDevice(tenantId, null, deviceId); + } + + protected void deleteDevice(TenantId tenantId, Edge edge, DeviceId deviceId) { + Device deviceById = edgeCtx.getDeviceService().findDeviceById(tenantId, deviceId); + if (deviceById != null) { + edgeCtx.getDeviceService().deleteDevice(tenantId, deviceId); + pushEntityEventToRuleEngine(tenantId, edge, deviceById, TbMsgType.ENTITY_DELETED); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 763821f11c..925e903ab2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -76,10 +76,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor implements DevicePr saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge); return Futures.immediateFuture(null); case ENTITY_DELETED_RPC_MESSAGE: - Device deviceToDelete = edgeCtx.getDeviceService().findDeviceById(tenantId, deviceId); - if (deviceToDelete != null) { - edgeCtx.getDeviceService().unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); - } + deleteDevice(tenantId, edge, deviceId); return Futures.immediateFuture(null); case UNRECOGNIZED: default: @@ -125,14 +122,8 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor implements DevicePr } private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) { - try { - Device device = edgeCtx.getDeviceService().findDeviceById(tenantId, deviceId); - String deviceAsString = JacksonUtil.toString(device); - TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, device.getCustomerId()); - pushEntityEventToRuleEngine(tenantId, deviceId, device.getCustomerId(), TbMsgType.ENTITY_CREATED, deviceAsString, msgMetaData); - } catch (Exception e) { - log.warn("[{}][{}] Failed to push device action to rule engine: {}", tenantId, deviceId, TbMsgType.ENTITY_CREATED.name(), e); - } + Device device = edgeCtx.getDeviceService().findDeviceById(tenantId, deviceId); + pushEntityEventToRuleEngine(tenantId, edge, device, TbMsgType.ENTITY_CREATED); } @Override diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index da8e7563e0..8d79416022 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -79,6 +79,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -413,12 +414,10 @@ public class DeviceEdgeTest extends AbstractEdgeTest { edgeImitator.expectResponsesAmount(1); edgeImitator.sendUplinkMsg(upLinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); - DeviceInfo deviceInfo = doGet("/api/device/info/" + savedDevice.getUuidId(), DeviceInfo.class); - Assert.assertNotNull(deviceInfo); - List edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/devices?", - new TypeReference>() { - }, new PageLink(100)).getData(); - Assert.assertFalse(edgeDevices.contains(deviceInfo)); + + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + doGet("/api/device/info/" + savedDevice.getUuidId(), DeviceInfo.class, status().isNotFound()) + ); } @Test From 394075c9df75b9528ef69206544d09e3f3a9580c Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Fri, 28 Nov 2025 15:11:32 +0200 Subject: [PATCH 5/7] Handle entity view removal from edge event in EntityViewEdgeProcessor --- .../entityview/BaseEntityViewProcessor.java | 13 +++++++ .../entityview/EntityViewEdgeProcessor.java | 34 +++++++------------ .../server/edge/EntityViewEdgeTest.java | 15 +++----- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/BaseEntityViewProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/BaseEntityViewProcessor.java index 4ee532a33b..97f712c5ac 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/BaseEntityViewProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/BaseEntityViewProcessor.java @@ -21,9 +21,11 @@ import org.springframework.data.util.Pair; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @@ -69,4 +71,15 @@ public abstract class BaseEntityViewProcessor extends BaseEdgeProcessor { protected abstract void setCustomerId(TenantId tenantId, CustomerId customerId, EntityView entityView, EntityViewUpdateMsg entityViewUpdateMsg); + protected void deleteEntityView(TenantId tenantId, EntityViewId entityViewId) { + deleteEntityView(tenantId, null, entityViewId); + } + + protected void deleteEntityView(TenantId tenantId, Edge edge, EntityViewId entityViewId) { + EntityView entityViewById = edgeCtx.getEntityViewService().findEntityViewById(tenantId, entityViewId); + if (entityViewById != null) { + edgeCtx.getEntityViewService().deleteEntityView(tenantId, entityViewId); + pushEntityEventToRuleEngine(tenantId, edge, entityViewById, TbMsgType.ENTITY_DELETED); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index 56785f9ac0..42757764db 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -54,21 +54,17 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor implements try { edgeSynchronizationManager.getEdgeId().set(edge.getId()); - switch (entityViewUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - case ENTITY_UPDATED_RPC_MESSAGE: + return switch (entityViewUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE, ENTITY_UPDATED_RPC_MESSAGE -> { saveOrUpdateEntityView(tenantId, entityViewId, entityViewUpdateMsg, edge); - return Futures.immediateFuture(null); - case ENTITY_DELETED_RPC_MESSAGE: - EntityView entityViewToDelete = edgeCtx.getEntityViewService().findEntityViewById(tenantId, entityViewId); - if (entityViewToDelete != null) { - edgeCtx.getEntityViewService().unassignEntityViewFromEdge(tenantId, entityViewId, edge.getId()); - } - return Futures.immediateFuture(null); - case UNRECOGNIZED: - default: - return handleUnsupportedMsgType(entityViewUpdateMsg.getMsgType()); - } + yield Futures.immediateFuture(null); + } + case ENTITY_DELETED_RPC_MESSAGE -> { + deleteEntityView(tenantId, entityViewId); + yield Futures.immediateFuture(null); + } + default -> handleUnsupportedMsgType(entityViewUpdateMsg.getMsgType()); + }; } catch (DataValidationException e) { if (e.getMessage().contains("limit reached")) { log.warn("[{}] Number of allowed entity views violated {}", tenantId, entityViewUpdateMsg, e); @@ -96,14 +92,8 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor implements } private void pushEntityViewCreatedEventToRuleEngine(TenantId tenantId, Edge edge, EntityViewId entityViewId) { - try { - EntityView entityView = edgeCtx.getEntityViewService().findEntityViewById(tenantId, entityViewId); - String entityViewAsString = JacksonUtil.toString(entityView); - TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, entityView.getCustomerId()); - pushEntityEventToRuleEngine(tenantId, entityViewId, entityView.getCustomerId(), TbMsgType.ENTITY_CREATED, entityViewAsString, msgMetaData); - } catch (Exception e) { - log.warn("[{}][{}] Failed to push entity view action to rule engine: {}", tenantId, entityViewId, TbMsgType.ENTITY_CREATED.name(), e); - } + EntityView entityView = edgeCtx.getEntityViewService().findEntityViewById(tenantId, entityViewId); + pushEntityEventToRuleEngine(tenantId, edge, entityView, TbMsgType.ENTITY_CREATED); } @Override diff --git a/application/src/test/java/org/thingsboard/server/edge/EntityViewEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/EntityViewEdgeTest.java index 875a83e3f5..2ff52d1c4d 100644 --- a/application/src/test/java/org/thingsboard/server/edge/EntityViewEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/EntityViewEdgeTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.edge; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.protobuf.AbstractMessage; import com.google.protobuf.InvalidProtocolBufferException; import org.junit.Assert; @@ -31,8 +30,6 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.v1.EntityViewsRequestMsg; @@ -40,10 +37,11 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg; -import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @DaoSqlTest @@ -256,12 +254,9 @@ public class EntityViewEdgeTest extends AbstractEdgeTest { edgeImitator.expectResponsesAmount(1); edgeImitator.sendUplinkMsg(upLinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); - EntityViewInfo entityViewInfo = doGet("/api/entityView/info/" + savedEntityView.getUuidId(), EntityViewInfo.class); - Assert.assertNotNull(entityViewInfo); - List edgeAssets = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/entityViews?", - new TypeReference>() { - }, new PageLink(100)).getData(); - Assert.assertFalse(edgeAssets.contains(entityViewInfo)); + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + doGet("/api/entityView/info/" + savedEntityView.getUuidId(), EntityViewInfo.class, status().isNotFound()) + ); } private void verifyEntityViewUpdateMsg(EntityView entityView, Device device) throws InvalidProtocolBufferException { From 56e877dc6d9b2ba91f242685eb6c03daf2e579bf Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Fri, 28 Nov 2025 15:28:38 +0200 Subject: [PATCH 6/7] Add edge argument to EntityViewEdgeProcessor#deleteEntityView --- .../edge/rpc/processor/entityview/EntityViewEdgeProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index 42757764db..3b6aafea74 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -60,7 +60,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor implements yield Futures.immediateFuture(null); } case ENTITY_DELETED_RPC_MESSAGE -> { - deleteEntityView(tenantId, entityViewId); + deleteEntityView(tenantId, edge, entityViewId); yield Futures.immediateFuture(null); } default -> handleUnsupportedMsgType(entityViewUpdateMsg.getMsgType()); From 616ae5d0ce832f7c6b319ed7079d335637c00041 Mon Sep 17 00:00:00 2001 From: Nikita Mazurenko Date: Mon, 1 Dec 2025 11:57:42 +0200 Subject: [PATCH 7/7] Replace 'new TbMsgMetaData()' with 'TbMsgMetaData.EMPTY' in pushEntityEventToRuleEngine --- .../server/service/edge/rpc/processor/BaseEdgeProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index b131450132..dee288d257 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -359,7 +359,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { try { String entityAsString = JacksonUtil.toString(entity); CustomerId customerId = getCustomerId(entity); - TbMsgMetaData tbMsgMetaData = edge == null ? new TbMsgMetaData() : getEdgeActionTbMsgMetaData(edge, customerId); + TbMsgMetaData tbMsgMetaData = edge == null ? TbMsgMetaData.EMPTY : getEdgeActionTbMsgMetaData(edge, customerId); pushEntityEventToRuleEngine(tenantId, entity.getId(), customerId, msgType, entityAsString, tbMsgMetaData); } catch (Exception e) {