Browse Source

Updated edge update events handling - assign/usassign to/from customer

pull/7395/head
Volodymyr Babak 4 years ago
parent
commit
719940fbc3
  1. 6
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  2. 6
      application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java
  3. 8
      application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java
  4. 27
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  5. 46
      application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EdgeMsgConstructor.java
  6. 4
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java
  7. 46
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java
  8. 2
      application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java
  9. 11
      application/src/main/java/org/thingsboard/server/service/entitiy/edge/DefaultTbEdgeService.java
  10. 4
      application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java
  11. 1
      common/edge-api/src/main/proto/edge.proto

6
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -242,11 +242,9 @@ public class TenantActor extends RuleChainManagerActor {
EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService();
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
edgeRpcService.deleteEdge(tenantId, edgeId);
} else {
} else if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId);
if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
edgeRpcService.updateEdge(tenantId, edge);
}
edgeRpcService.updateEdge(tenantId, edge);
}
} else if (isRuleEngine) {
TbActorRef target = getEntityActorRef(msg.getEntityId());

6
application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java

@ -40,7 +40,6 @@ import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EntityEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor;
@ -64,9 +63,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
@Autowired
private TbClusterService clusterService;
@Autowired
private EdgeProcessor edgeProcessor;
@Autowired
private EntityEdgeProcessor entityProcessor;
@ -124,8 +120,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
ListenableFuture<Void> future;
switch (type) {
case EDGE:
future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg);
break;
case ASSET:
case DEVICE:
case ENTITY_VIEW:

8
application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java

@ -36,6 +36,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
import org.thingsboard.server.service.edge.rpc.constructor.EdgeMsgConstructor;
import org.thingsboard.server.service.edge.rpc.processor.AdminSettingsEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.AssetEdgeProcessor;
@ -43,6 +44,7 @@ import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DashboardEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DeviceEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EntityEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EntityViewEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.OtaPackageEdgeProcessor;
@ -117,6 +119,9 @@ public class EdgeContextComponent {
@Autowired
private DeviceProfileEdgeProcessor deviceProfileProcessor;
@Autowired
private EdgeProcessor edgeProcessor;
@Autowired
private DeviceEdgeProcessor deviceProcessor;
@ -162,6 +167,9 @@ public class EdgeContextComponent {
@Autowired
private QueueEdgeProcessor queueEdgeProcessor;
@Autowired
private EdgeMsgConstructor edgeMsgConstructor;
@Autowired
private EdgeEventStorageSettings edgeEventStorageSettings;

27
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -24,7 +24,6 @@ import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge;
@ -289,7 +288,7 @@ public final class EdgeGrpcSession implements Closeable {
log.debug("[{}] onConfigurationUpdate [{}]", this.sessionId, edge);
this.edge = edge;
EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder()
.setConfiguration(constructEdgeConfigProto(edge)).build();
.setConfiguration(ctx.getEdgeMsgConstructor().constructEdgeConfiguration(edge)).build();
ResponseMsg edgeConfigMsg = ResponseMsg.newBuilder()
.setEdgeUpdateMsg(edgeConfig)
.build();
@ -509,6 +508,8 @@ public final class EdgeGrpcSession implements Closeable {
UpdateMsgType msgType = getResponseMsgType(edgeEvent.getAction());
log.trace("Executing processEntityMessage, edgeEvent [{}], action [{}], msgType [{}]", edgeEvent, action, msgType);
switch (edgeEvent.getType()) {
case EDGE:
return ctx.getEdgeProcessor().processEdgeToEdge(edgeEvent, action);
case DEVICE:
return ctx.getDeviceProcessor().processDeviceToEdge(edge, edgeEvent, msgType, action);
case DEVICE_PROFILE:
@ -664,7 +665,7 @@ public final class EdgeGrpcSession implements Closeable {
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.ACCEPTED)
.setErrorMsg("")
.setConfiguration(constructEdgeConfigProto(edge)).build();
.setConfiguration(ctx.getEdgeMsgConstructor().constructEdgeConfiguration(edge)).build();
}
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
@ -684,26 +685,6 @@ public final class EdgeGrpcSession implements Closeable {
.setConfiguration(EdgeConfiguration.getDefaultInstance()).build();
}
private EdgeConfiguration constructEdgeConfigProto(Edge edge) {
EdgeConfiguration.Builder builder = EdgeConfiguration.newBuilder()
.setEdgeIdMSB(edge.getId().getId().getMostSignificantBits())
.setEdgeIdLSB(edge.getId().getId().getLeastSignificantBits())
.setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits())
.setName(edge.getName())
.setType(edge.getType())
.setRoutingKey(edge.getRoutingKey())
.setSecret(edge.getSecret())
.setAdditionalInfo(JacksonUtil.toString(edge.getAdditionalInfo()))
.setCloudType("CE");
if (edge.getCustomerId() != null) {
builder.setCustomerIdMSB(edge.getCustomerId().getId().getMostSignificantBits())
.setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits());
}
return builder
.build();
}
@Override
public void close() {
log.debug("[{}] Closing session", sessionId);

46
application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EdgeMsgConstructor.java

@ -0,0 +1,46 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.constructor;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
import org.thingsboard.server.queue.util.TbCoreComponent;
@Component
@TbCoreComponent
public class EdgeMsgConstructor {
public EdgeConfiguration constructEdgeConfiguration(Edge edge) {
EdgeConfiguration.Builder builder = EdgeConfiguration.newBuilder()
.setEdgeIdMSB(edge.getId().getId().getMostSignificantBits())
.setEdgeIdLSB(edge.getId().getId().getLeastSignificantBits())
.setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits())
.setName(edge.getName())
.setType(edge.getType())
.setRoutingKey(edge.getRoutingKey())
.setSecret(edge.getSecret())
.setAdditionalInfo(JacksonUtil.toString(edge.getAdditionalInfo()))
.setCloudType("CE");
if (edge.getCustomerId() != null) {
builder.setCustomerIdMSB(edge.getCustomerId().getId().getMostSignificantBits())
.setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits());
}
return builder.build();
}
}

4
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java

@ -62,6 +62,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.CustomerMsgConstructo
import org.thingsboard.server.service.edge.rpc.constructor.DashboardMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DeviceMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DeviceProfileMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.EdgeMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.EntityViewMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.OtaPackageMsgConstructor;
@ -159,6 +160,9 @@ public abstract class BaseEdgeProcessor {
@Autowired
protected DataValidator<Device> deviceValidator;
@Autowired
protected EdgeMsgConstructor edgeMsgConstructor;
@Autowired
protected EntityDataMsgConstructor entityDataMsgConstructor;

46
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java

@ -15,43 +15,39 @@
*/
package org.thingsboard.server.service.edge.rpc.processor;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Component
@Slf4j
@TbCoreComponent
public class EdgeProcessor extends BaseEdgeProcessor {
public ListenableFuture<Void> processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
try {
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
switch (actionType) {
// TODO: add support for edge update
default:
return Futures.immediateFuture(null);
}
} catch (Exception e) {
log.error("Exception during processing edge event", e);
return Futures.immediateFailedFuture(e);
public DownlinkMsg processEdgeToEdge(EdgeEvent edgeEvent, EdgeEventActionType action) {
EdgeId edgeId = new EdgeId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Edge edge = edgeService.findEdgeById(edgeEvent.getTenantId(), edgeId);
if (edge != null) {
EdgeConfiguration edgeConfigMsg =
edgeMsgConstructor.constructEdgeConfiguration(edge);
downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.setEdgeConfiguration(edgeConfigMsg)
.build();
}
break;
}
return downlinkMsg;
}
}

2
application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java

@ -211,8 +211,6 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS
lifecycleEvent = ComponentLifecycleEvent.CREATED;
break;
case UPDATED:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
lifecycleEvent = ComponentLifecycleEvent.UPDATED;
break;
case DELETED:

11
application/src/main/java/org/thingsboard/server/service/entitiy/edge/DefaultTbEdgeService.java

@ -82,11 +82,12 @@ public class DefaultTbEdgeService extends AbstractTbEntityService implements TbE
@Override
public Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, Customer customer, User user) throws ThingsboardException {
ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER;
CustomerId customerId = customer.getId();
try {
Edge savedEdge = checkNotNull(edgeService.assignEdgeToCustomer(tenantId, edgeId, customerId));
notificationEntityService.notifyCreateOrUpdateOrDeleteEdge(tenantId, edgeId, customerId, savedEdge, ActionType.ASSIGNED_TO_CUSTOMER, user,
edgeId.toString(), customerId.toString(), customer.getName());
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, edgeId, customerId, savedEdge,
actionType, user, true, edgeId.toString(), customerId.toString(), customer.getName());
return savedEdge;
} catch (Exception e) {
@ -98,14 +99,14 @@ public class DefaultTbEdgeService extends AbstractTbEntityService implements TbE
@Override
public Edge unassignEdgeFromCustomer(Edge edge, Customer customer, User user) throws ThingsboardException {
ActionType actionType = ActionType.UNASSIGNED_FROM_CUSTOMER;
TenantId tenantId = edge.getTenantId();
EdgeId edgeId = edge.getId();
CustomerId customerId = customer.getId();
try {
Edge savedEdge = checkNotNull(edgeService.unassignEdgeFromCustomer(tenantId, edgeId));
notificationEntityService.notifyCreateOrUpdateOrDeleteEdge(tenantId, edgeId, customerId, savedEdge, ActionType.UNASSIGNED_FROM_CUSTOMER, user,
edgeId.toString(), customerId.toString(), customer.getName());
notificationEntityService.notifyAssignOrUnassignEntityToCustomer(tenantId, edgeId, customerId, savedEdge,
actionType, user, true, edgeId.toString(), customerId.toString(), customer.getName());
return savedEdge;
} catch (Exception e) {
notificationEntityService.logEntityAction(tenantId, emptyId(EntityType.EDGE),

4
application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java

@ -289,6 +289,10 @@ public class EdgeImitator {
result.add(saveDownlinkMsg(queueUpdateMsg));
}
}
if (downlinkMsg.hasEdgeConfiguration()) {
result.add(saveDownlinkMsg(downlinkMsg.getEdgeConfiguration()));
}
return Futures.allAsList(result);
}

1
common/edge-api/src/main/proto/edge.proto

@ -535,5 +535,6 @@ message DownlinkMsg {
repeated DeviceRpcCallMsg deviceRpcCallMsg = 21;
repeated OtaPackageUpdateMsg otaPackageUpdateMsg = 22;
repeated QueueUpdateMsg queueUpdateMsg = 23;
EdgeConfiguration edgeConfiguration = 24;
}

Loading…
Cancel
Save