diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 709f4be250..31d9e477ac 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -29,7 +29,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.SmsService; @@ -101,6 +100,7 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.executors.NotificationExecutorService; +import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.executors.SharedEventLoopGroupService; import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.profile.TbAssetProfileCache; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 3a0ba8408a..58bd82abb3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -25,10 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.LinkedHashMapRemoveEldest; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; @@ -61,7 +57,14 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; @@ -87,10 +90,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; -import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.rpc.RpcSubmitStrategy; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.Nullable; @@ -173,7 +173,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private EdgeId findRelatedEdgeId() { List result = - systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON); + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); if (result != null && result.size() > 0) { EntityRelation relationToEdge = result.get(0); if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index b88674c8ce..ee05e82900 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -44,6 +44,8 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.TbMsg; @@ -57,6 +59,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -68,8 +71,8 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -77,6 +80,7 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -116,6 +120,7 @@ public class DefaultTbClusterService implements TbClusterService { private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; private final GatewayNotificationsService gatewayNotificationsService; + private final EdgeService edgeService; @Override public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) { @@ -546,11 +551,17 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null); break; case UNASSIGNED_FROM_EDGE: - pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), null), null); + EdgeId relatedEdgeId = findRelatedEdgeIdIfAny(tenantId, entityId); + pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null); break; } } + private EdgeId findRelatedEdgeIdIfAny(TenantId tenantId, EntityId entityId) { + PageData pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, new PageLink(1)); + return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() > 0).map(pd -> pd.getData().get(0)).orElse(null); + } + @Override public void onQueueChange(Queue queue) { log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName()); diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index b79f37f94a..488bd4ed70 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -25,8 +25,10 @@ import io.netty.handler.codec.mqtt.MqttQoS; import org.awaitility.Awaitility; import org.junit.Assert; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.TestPropertySource; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -44,15 +46,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.session.FeatureType; -import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; @@ -75,6 +79,7 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -86,6 +91,9 @@ public class DeviceEdgeTest extends AbstractEdgeTest { private static final String DEFAULT_DEVICE_TYPE = "default"; + @Autowired + protected EdgeService edgeService; + @Test public void testDevices() throws Exception { // create device and assign to edge; update device @@ -769,6 +777,54 @@ public class DeviceEdgeTest extends AbstractEdgeTest { } } + @Test + public void testVerifyProcessCorrectEdgeUpdateToDeviceActorOnUnassignFromDifferentEdge() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + // assign device to another edge + Edge tmpEdge = doPost("/api/edge", constructEdge("Test Tmp Edge", "test"), Edge.class); + doPost("/api/edge/" + tmpEdge.getUuidId() + + "/device/" + device.getUuidId(), Device.class); + List relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, device.getId()); + Assert.assertEquals(2, relatedEdgeIds.size()); + + // unassign device from edge + doDelete("/api/edge/" + edge.getUuidId() + + "/device/" + device.getUuidId(), Device.class); + relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, device.getId()); + Assert.assertEquals(1, relatedEdgeIds.size()); + Assert.assertEquals(tmpEdge.getId(), relatedEdgeIds.get(0)); + + // clean up stored edge events + edgeEventService.cleanupEvents(1); + + // perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly + doPostAsync( + "/api/rpc/oneway/" + device.getId().getId().toString(), + JacksonUtil.toString(createDefaultRpc()), + String.class, + status().isOk()); + + final AtomicReference> resultRef = new AtomicReference<>(); + Awaitility.await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + PageData result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1)); + resultRef.set(result); + return result != null && result.getData().size() == 1; + }); + + PageData result = resultRef.get(); + EdgeEvent edgeEvent = result.getData().get(0); + Assert.assertEquals(EdgeEventActionType.RPC_CALL, edgeEvent.getAction()); + Assert.assertEquals(EdgeEventType.DEVICE, edgeEvent.getType()); + Assert.assertEquals(tmpEdge.getId(), edgeEvent.getEdgeId()); + Assert.assertEquals(device.getId().getId(), edgeEvent.getEntityId()); + + // clean up tmp edge + doDelete("/api/edge/" + tmpEdge.getId().getId().toString()).andExpect(status().isOk()); + } + private Device buildDeviceForUplinkMsg(String name, String type) { Device device = new Device(); device.setId(new DeviceId(UUID.randomUUID())); @@ -778,7 +834,6 @@ public class DeviceEdgeTest extends AbstractEdgeTest { return device; } - private DeviceCredentials buildDeviceCredentialsForUplinkMsg(DeviceId deviceId) { DeviceCredentials deviceCredentials = new DeviceCredentials(); deviceCredentials.setDeviceId(deviceId); @@ -786,4 +841,20 @@ public class DeviceEdgeTest extends AbstractEdgeTest { deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); return deviceCredentials; } + + private ObjectNode createDefaultRpc() { + ObjectNode rpc = JacksonUtil.newObjectNode(); + rpc.put("method", "setGpio"); + + ObjectNode params = JacksonUtil.newObjectNode(); + + params.put("pin", 7); + params.put("value", 1); + + rpc.set("params", params); + rpc.put("persistent", true); + rpc.put("timeout", 5000); + + return rpc; + } } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index f628037c5d..6efb171fb4 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -29,11 +29,12 @@ import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -73,6 +74,8 @@ public class DefaultTbClusterServiceTest { @MockBean protected GatewayNotificationsService gatewayNotificationsService; @MockBean + protected EdgeService edgeService; + @MockBean protected PartitionService partitionService; @MockBean protected TbQueueProducerProvider producerProvider;