From d1a0dd95f6f7bf39c56dfeb3b4b0b84ec424549b Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 5 Jan 2024 11:28:45 +0200 Subject: [PATCH 1/6] Improve RPC device actor init to find edgeId by CONSTAINS_TYPE --- .../server/actors/ActorSystemContext.java | 2 +- .../device/DeviceActorMessageProcessor.java | 16 ++++++++-------- .../service/queue/DefaultTbClusterService.java | 15 +++++++++++++-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 09dcf3ca1e..2dc73c51c7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -29,7 +29,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.SmsService; @@ -101,6 +100,7 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.executors.NotificationExecutorService; +import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.executors.SharedEventLoopGroupService; import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.profile.TbAssetProfileCache; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 50e237ac73..aee8a33ab7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -25,10 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.LinkedHashMapRemoveEldest; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; @@ -61,7 +57,14 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; @@ -87,10 +90,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; -import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.rpc.RpcSubmitStrategy; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.Nullable; @@ -173,7 +173,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private EdgeId findRelatedEdgeId() { List result = - systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON); + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); if (result != null && result.size() > 0) { EntityRelation relationToEdge = result.get(0); if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 14930912d1..cfa87c0508 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -44,6 +44,8 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.TbMsg; @@ -57,6 +59,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -68,8 +71,8 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -77,6 +80,7 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -116,6 +120,7 @@ public class DefaultTbClusterService implements TbClusterService { private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; private final GatewayNotificationsService gatewayNotificationsService; + private final EdgeService edgeService; @Override public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) { @@ -546,11 +551,17 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null); break; case UNASSIGNED_FROM_EDGE: - pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), null), null); + EdgeId relatedEdgeId = findRelatedEdgeEdgeIdIfAny(tenantId, entityId); + pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null); break; } } + private EdgeId findRelatedEdgeEdgeIdIfAny(TenantId tenantId, EntityId entityId) { + PageData pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, new PageLink(1)); + return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() == 1).map(pd -> pd.getData().get(0)).orElse(null); + } + @Override public void onQueueChange(Queue queue) { log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName()); From a8e436c102983761a5b376d5a4c2470dbb30a611 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 5 Jan 2024 11:34:07 +0200 Subject: [PATCH 2/6] Improve naming for method findRelatedEdgeIdIfAny --- .../server/service/queue/DefaultTbClusterService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index cfa87c0508..21d1d2022d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -551,13 +551,13 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null); break; case UNASSIGNED_FROM_EDGE: - EdgeId relatedEdgeId = findRelatedEdgeEdgeIdIfAny(tenantId, entityId); + EdgeId relatedEdgeId = findRelatedEdgeIdIfAny(tenantId, entityId); pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null); break; } } - private EdgeId findRelatedEdgeEdgeIdIfAny(TenantId tenantId, EntityId entityId) { + private EdgeId findRelatedEdgeIdIfAny(TenantId tenantId, EntityId entityId) { PageData pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, new PageLink(1)); return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() == 1).map(pd -> pd.getData().get(0)).orElse(null); } From e2b2dea9685b86c6416cf233a81cab46f950caa0 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 5 Jan 2024 12:31:43 +0200 Subject: [PATCH 3/6] Improve findRelatedEdgeIdIfAny to check for total elements > 0 --- .../server/service/queue/DefaultTbClusterService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 21d1d2022d..d567d1b447 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -559,7 +559,7 @@ public class DefaultTbClusterService implements TbClusterService { private EdgeId findRelatedEdgeIdIfAny(TenantId tenantId, EntityId entityId) { PageData pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, new PageLink(1)); - return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() == 1).map(pd -> pd.getData().get(0)).orElse(null); + return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() > 0).map(pd -> pd.getData().get(0)).orElse(null); } @Override From e227caac514e4fa5b117d59687419e0455a0ffcb Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 5 Jan 2024 14:27:36 +0200 Subject: [PATCH 4/6] Provide test for fixed bug --- .../server/edge/DeviceEdgeTest.java | 65 ++++++++++++++++++- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 7bb05cb4d5..c9a7a97c9a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -25,8 +25,10 @@ import io.netty.handler.codec.mqtt.MqttQoS; import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.TestPropertySource; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -44,15 +46,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.session.FeatureType; -import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; @@ -86,6 +90,9 @@ public class DeviceEdgeTest extends AbstractEdgeTest { private static final String DEFAULT_DEVICE_TYPE = "default"; + @Autowired + protected EdgeService edgeService; + @Test public void testDevices() throws Exception { // create device and assign to edge; update device @@ -769,6 +776,45 @@ public class DeviceEdgeTest extends AbstractEdgeTest { } } + @Test + public void testVerifyProcessCorrectEdgeUpdateToDeviceActorOnUnassignFromDifferentEdge() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + // assign device to another edge + Edge tmpEdge = doPost("/api/edge", constructEdge("Test Tmp Edge", "test"), Edge.class); + doPost("/api/edge/" + tmpEdge.getUuidId() + + "/device/" + device.getUuidId(), Device.class); + List relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, device.getId()); + Assert.assertEquals(2, relatedEdgeIds.size()); + + // unassign device from edge + doDelete("/api/edge/" + edge.getUuidId() + + "/device/" + device.getUuidId(), Device.class); + relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, device.getId()); + Assert.assertEquals(1, relatedEdgeIds.size()); + Assert.assertEquals(tmpEdge.getId(), relatedEdgeIds.get(0)); + + // clean up stored edge events + edgeEventService.cleanupEvents(1); + + // perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly + doPostAsync( + "/api/rpc/oneway/" + device.getId().getId().toString(), + JacksonUtil.toString(createDefaultRpc()), + String.class, + status().isOk()); + + PageData result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1)); + EdgeEvent edgeEvent = result.getData().get(0); + Assert.assertEquals(EdgeEventActionType.RPC_CALL, edgeEvent.getAction()); + Assert.assertEquals(EdgeEventType.DEVICE, edgeEvent.getType()); + Assert.assertEquals(tmpEdge.getId(), edgeEvent.getEdgeId()); + Assert.assertEquals(device.getId().getId(), edgeEvent.getEntityId()); + + // clean up tmp edge + doDelete("/api/edge/" + tmpEdge.getId().getId().toString()).andExpect(status().isOk()); + } + private Device buildDeviceForUplinkMsg(String name, String type) { Device device = new Device(); device.setId(new DeviceId(UUID.randomUUID())); @@ -778,7 +824,6 @@ public class DeviceEdgeTest extends AbstractEdgeTest { return device; } - private DeviceCredentials buildDeviceCredentialsForUplinkMsg(DeviceId deviceId) { DeviceCredentials deviceCredentials = new DeviceCredentials(); deviceCredentials.setDeviceId(deviceId); @@ -786,4 +831,20 @@ public class DeviceEdgeTest extends AbstractEdgeTest { deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); return deviceCredentials; } + + private ObjectNode createDefaultRpc() { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("method", "setGpio"); + + ObjectNode params = JacksonUtil.newObjectNode(); + + params.put("pin", 7); + params.put("value", 1); + + rpc.set("params", params); + rpc.put("persistent", true); + rpc.put("timeout", 5000); + + return rpc; + } } From 6252433231c7bcb0c2258442994e87f7cc943926 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 5 Jan 2024 14:34:32 +0200 Subject: [PATCH 5/6] Fix DefaultTbClusterServiceTest --- .../server/service/queue/DefaultTbClusterServiceTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index 233839bb5f..9d98ec25c7 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -29,11 +29,12 @@ import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -73,6 +74,8 @@ public class DefaultTbClusterServiceTest { @MockBean protected GatewayNotificationsService gatewayNotificationsService; @MockBean + protected EdgeService edgeService; + @MockBean protected PartitionService partitionService; @MockBean protected TbQueueProducerProvider producerProvider; From f15baf38ceea224d93c1bef1ebc5eb2ab154df4a Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 5 Jan 2024 15:09:30 +0200 Subject: [PATCH 6/6] Improve stability for testVerifyProcessCorrectEdgeUpdateToDeviceActorOnUnassignFromDifferentEdge --- .../org/thingsboard/server/edge/DeviceEdgeTest.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index c9a7a97c9a..e2bfee9aae 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -79,6 +79,7 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -804,7 +805,16 @@ public class DeviceEdgeTest extends AbstractEdgeTest { String.class, status().isOk()); - PageData result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1)); + final AtomicReference> resultRef = new AtomicReference<>(); + Awaitility.await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + PageData result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1)); + resultRef.set(result); + return result != null && result.getData().size() == 1; + }); + + PageData result = resultRef.get(); EdgeEvent edgeEvent = result.getData().get(0); Assert.assertEquals(EdgeEventActionType.RPC_CALL, edgeEvent.getAction()); Assert.assertEquals(EdgeEventType.DEVICE, edgeEvent.getType());