diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index c22b44d7cf..7a0b4617a5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -242,11 +242,9 @@ public class TenantActor extends RuleChainManagerActor { EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService(); if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { edgeRpcService.deleteEdge(tenantId, edgeId); - } else { + } else if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) { Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId); - if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) { - edgeRpcService.updateEdge(tenantId, edge); - } + edgeRpcService.updateEdge(tenantId, edge); } } else if (isRuleEngine) { TbActorRef target = getEntityActorRef(msg.getEntityId()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 4829e1cd98..198c433416 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -40,7 +40,6 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.EntityEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor; @@ -64,9 +63,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @Autowired private TbClusterService clusterService; - @Autowired - private EdgeProcessor edgeProcessor; - @Autowired private EntityEdgeProcessor entityProcessor; @@ -124,8 +120,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { ListenableFuture future; switch (type) { case EDGE: - future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg); - break; case ASSET: case DEVICE: case ENTITY_VIEW: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index cd2b8fa626..089cb6d794 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -36,6 +36,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings; +import org.thingsboard.server.service.edge.rpc.constructor.EdgeMsgConstructor; import org.thingsboard.server.service.edge.rpc.processor.AdminSettingsEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.AssetEdgeProcessor; @@ -43,6 +44,7 @@ import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.DashboardEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.DeviceEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.EntityEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.EntityViewEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.OtaPackageEdgeProcessor; @@ -117,6 +119,9 @@ public class EdgeContextComponent { @Autowired private DeviceProfileEdgeProcessor deviceProfileProcessor; + @Autowired + private EdgeProcessor edgeProcessor; + @Autowired private DeviceEdgeProcessor deviceProcessor; @@ -162,6 +167,9 @@ public class EdgeContextComponent { @Autowired private QueueEdgeProcessor queueEdgeProcessor; + @Autowired + private EdgeMsgConstructor edgeMsgConstructor; + @Autowired private EdgeEventStorageSettings edgeEventStorageSettings; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 53375ff1fc..d42c1f0df9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -24,7 +24,6 @@ import io.grpc.stub.StreamObserver; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; @@ -289,7 +288,7 @@ public final class EdgeGrpcSession implements Closeable { log.debug("[{}] onConfigurationUpdate [{}]", this.sessionId, edge); this.edge = edge; EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder() - .setConfiguration(constructEdgeConfigProto(edge)).build(); + .setConfiguration(ctx.getEdgeMsgConstructor().constructEdgeConfiguration(edge)).build(); ResponseMsg edgeConfigMsg = ResponseMsg.newBuilder() .setEdgeUpdateMsg(edgeConfig) .build(); @@ -509,6 +508,8 @@ public final class EdgeGrpcSession implements Closeable { UpdateMsgType msgType = getResponseMsgType(edgeEvent.getAction()); log.trace("Executing processEntityMessage, edgeEvent [{}], action [{}], msgType [{}]", edgeEvent, action, msgType); switch (edgeEvent.getType()) { + case EDGE: + return ctx.getEdgeProcessor().processEdgeToEdge(edgeEvent, action); case DEVICE: return ctx.getDeviceProcessor().processDeviceToEdge(edge, edgeEvent, msgType, action); case DEVICE_PROFILE: @@ -664,7 +665,7 @@ public final class EdgeGrpcSession implements Closeable { return ConnectResponseMsg.newBuilder() .setResponseCode(ConnectResponseCode.ACCEPTED) .setErrorMsg("") - .setConfiguration(constructEdgeConfigProto(edge)).build(); + .setConfiguration(ctx.getEdgeMsgConstructor().constructEdgeConfiguration(edge)).build(); } return ConnectResponseMsg.newBuilder() .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS) @@ -684,26 +685,6 @@ public final class EdgeGrpcSession implements Closeable { .setConfiguration(EdgeConfiguration.getDefaultInstance()).build(); } - private EdgeConfiguration constructEdgeConfigProto(Edge edge) { - EdgeConfiguration.Builder builder = EdgeConfiguration.newBuilder() - .setEdgeIdMSB(edge.getId().getId().getMostSignificantBits()) - .setEdgeIdLSB(edge.getId().getId().getLeastSignificantBits()) - .setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits()) - .setName(edge.getName()) - .setType(edge.getType()) - .setRoutingKey(edge.getRoutingKey()) - .setSecret(edge.getSecret()) - .setAdditionalInfo(JacksonUtil.toString(edge.getAdditionalInfo())) - .setCloudType("CE"); - if (edge.getCustomerId() != null) { - builder.setCustomerIdMSB(edge.getCustomerId().getId().getMostSignificantBits()) - .setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits()); - } - return builder - .build(); - } - @Override public void close() { log.debug("[{}] Closing session", sessionId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EdgeMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EdgeMsgConstructor.java new file mode 100644 index 0000000000..5501217578 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EdgeMsgConstructor.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor; + +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; +import org.thingsboard.server.queue.util.TbCoreComponent; + +@Component +@TbCoreComponent +public class EdgeMsgConstructor { + + public EdgeConfiguration constructEdgeConfiguration(Edge edge) { + EdgeConfiguration.Builder builder = EdgeConfiguration.newBuilder() + .setEdgeIdMSB(edge.getId().getId().getMostSignificantBits()) + .setEdgeIdLSB(edge.getId().getId().getLeastSignificantBits()) + .setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits()) + .setName(edge.getName()) + .setType(edge.getType()) + .setRoutingKey(edge.getRoutingKey()) + .setSecret(edge.getSecret()) + .setAdditionalInfo(JacksonUtil.toString(edge.getAdditionalInfo())) + .setCloudType("CE"); + if (edge.getCustomerId() != null) { + builder.setCustomerIdMSB(edge.getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits()); + } + return builder.build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 1867b11bc4..c1752baaa3 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -62,6 +62,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.CustomerMsgConstructo import org.thingsboard.server.service.edge.rpc.constructor.DashboardMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.DeviceMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.DeviceProfileMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.EdgeMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.EntityViewMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.OtaPackageMsgConstructor; @@ -159,6 +160,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected DataValidator deviceValidator; + @Autowired + protected EdgeMsgConstructor edgeMsgConstructor; + @Autowired protected EntityDataMsgConstructor entityDataMsgConstructor; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java index 73fa828e43..4e0e962c8b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java @@ -15,43 +15,39 @@ */ package org.thingsboard.server.service.edge.rpc.processor; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; import org.thingsboard.server.queue.util.TbCoreComponent; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - @Component @Slf4j @TbCoreComponent public class EdgeProcessor extends BaseEdgeProcessor { - public ListenableFuture processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { - try { - EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); - switch (actionType) { - // TODO: add support for edge update - default: - return Futures.immediateFuture(null); - } - } catch (Exception e) { - log.error("Exception during processing edge event", e); - return Futures.immediateFailedFuture(e); + public DownlinkMsg processEdgeToEdge(EdgeEvent edgeEvent, EdgeEventActionType action) { + EdgeId edgeId = new EdgeId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + Edge edge = edgeService.findEdgeById(edgeEvent.getTenantId(), edgeId); + if (edge != null) { + EdgeConfiguration edgeConfigMsg = + edgeMsgConstructor.constructEdgeConfiguration(edge); + downlinkMsg = DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .setEdgeConfiguration(edgeConfigMsg) + .build(); + } + break; } + return downlinkMsg; } } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index 8c4b62a137..f9b8281c46 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -211,8 +211,6 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS lifecycleEvent = ComponentLifecycleEvent.CREATED; break; case UPDATED: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: lifecycleEvent = ComponentLifecycleEvent.UPDATED; break; case DELETED: diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/edge/DefaultTbEdgeService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/edge/DefaultTbEdgeService.java index 13591baa32..697fe35a42 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/edge/DefaultTbEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/edge/DefaultTbEdgeService.java @@ -82,11 +82,12 @@ public class DefaultTbEdgeService extends AbstractTbEntityService implements TbE @Override public Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, Customer customer, User user) throws ThingsboardException { + ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER; CustomerId customerId = customer.getId(); try { Edge savedEdge = checkNotNull(edgeService.assignEdgeToCustomer(tenantId, edgeId, customerId)); - notificationEntityService.notifyCreateOrUpdateOrDeleteEdge(tenantId, edgeId, customerId, savedEdge, ActionType.ASSIGNED_TO_CUSTOMER, user, - edgeId.toString(), customerId.toString(), customer.getName()); + notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, edgeId, customerId, savedEdge, + actionType, user, true, edgeId.toString(), customerId.toString(), customer.getName()); return savedEdge; } catch (Exception e) { @@ -98,14 +99,14 @@ public class DefaultTbEdgeService extends AbstractTbEntityService implements TbE @Override public Edge unassignEdgeFromCustomer(Edge edge, Customer customer, User user) throws ThingsboardException { + ActionType actionType = ActionType.UNASSIGNED_FROM_CUSTOMER; TenantId tenantId = edge.getTenantId(); EdgeId edgeId = edge.getId(); CustomerId customerId = customer.getId(); try { Edge savedEdge = checkNotNull(edgeService.unassignEdgeFromCustomer(tenantId, edgeId)); - - notificationEntityService.notifyCreateOrUpdateOrDeleteEdge(tenantId, edgeId, customerId, savedEdge, ActionType.UNASSIGNED_FROM_CUSTOMER, user, - edgeId.toString(), customerId.toString(), customer.getName()); + notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, edgeId, customerId, savedEdge, + actionType, user, true, edgeId.toString(), customerId.toString(), customer.getName()); return savedEdge; } catch (Exception e) { notificationEntityService.logEntityAction(tenantId, emptyId(EntityType.EDGE), diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index 1c0436d329..d5099b8617 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -289,6 +289,10 @@ public class EdgeImitator { result.add(saveDownlinkMsg(queueUpdateMsg)); } } + if (downlinkMsg.hasEdgeConfiguration()) { + result.add(saveDownlinkMsg(downlinkMsg.getEdgeConfiguration())); + } + return Futures.allAsList(result); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 06ca2e33a5..45af2368e7 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -535,5 +535,6 @@ message DownlinkMsg { repeated DeviceRpcCallMsg deviceRpcCallMsg = 21; repeated OtaPackageUpdateMsg otaPackageUpdateMsg = 22; repeated QueueUpdateMsg queueUpdateMsg = 23; + EdgeConfiguration edgeConfiguration = 24; }