Browse Source

Introduced Edge Lifecycle. Update Edge session configuration

pull/2436/head
Volodymyr Babak 6 years ago
parent
commit
6182b622dd
  1. 16
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 17
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  3. 9
      application/src/main/java/org/thingsboard/server/controller/EdgeController.java
  4. 20
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
  5. 14
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  6. 41
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java

16
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -66,6 +66,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
@ -254,15 +255,14 @@ public class ActorSystemContext {
@Getter
private TbCoreDeviceRpcService tbCoreDeviceRpcService;
@Lazy
@Autowired
@Getter
private EdgeService edgeService;
@Autowired(required = false)
@Getter private EdgeService edgeService;
@Lazy
@Autowired
@Getter
private EdgeEventService edgeEventService;
@Autowired(required = false)
@Getter private EdgeEventService edgeEventService;
@Autowired(required = false)
@Getter private EdgeRpcService edgeRpcService;
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
@Getter

17
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -31,10 +31,13 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.MsgType;
@ -47,6 +50,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import java.util.List;
import java.util.Optional;
@ -202,7 +206,18 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
if (isRuleEngineForCurrentTenant) {
if (msg.getEntityId().getEntityType() == EntityType.EDGE) {
EdgeId edgeId = new EdgeId(msg.getEntityId().getId());
EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService();
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
edgeRpcService.deleteEdge(edgeId);
} else {
Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId);
if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
edgeRpcService.updateEdge(edge);
}
}
} else if (isRuleEngineForCurrentTenant) {
TbActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {

9
application/src/main/java/org/thingsboard/server/controller/EdgeController.java

@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@ -101,6 +102,9 @@ public class EdgeController extends BaseController {
edgeService.assignDefaultRuleChainsToEdge(tenantId, savedEdge.getId());
}
tbClusterService.onEntityStateChange(savedEdge.getTenantId(), savedEdge.getId(),
created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
logEntityAction(savedEdge.getId(), savedEdge, null, created ? ActionType.ADDED : ActionType.UPDATED, null);
return savedEdge;
} catch (Exception e) {
@ -120,6 +124,9 @@ public class EdgeController extends BaseController {
Edge edge = checkEdgeId(edgeId, Operation.DELETE);
edgeService.deleteEdge(getTenantId(), edgeId);
tbClusterService.onEntityStateChange(getTenantId(), edgeId,
ComponentLifecycleEvent.DELETED);
logEntityAction(edgeId, edge,
null,
ActionType.DELETED, null, strEdgeId);
@ -284,6 +291,8 @@ public class EdgeController extends BaseController {
Edge updatedEdge = edgeNotificationService.setEdgeRootRuleChain(getTenantId(), edge, ruleChainId);
tbClusterService.onEntityStateChange(updatedEdge.getTenantId(), updatedEdge.getId(), ComponentLifecycleEvent.UPDATED);
logEntityAction(updatedEdge.getId(), updatedEdge, null, ActionType.UPDATED, null);
return updatedEdge;

20
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java

@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@ -52,7 +53,7 @@ import java.util.concurrent.Executors;
@Service
@Slf4j
@ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true")
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
private static final ObjectMapper mapper = new ObjectMapper();
@ -117,6 +118,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream();
}
@Override
public void updateEdge(Edge edge) {
EdgeGrpcSession session = sessions.get(edge.getId());
if (session != null && session.isConnected()) {
session.onConfigurationUpdate(edge);
}
}
@Override
public void deleteEdge(EdgeId edgeId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
session.close();
sessions.remove(edgeId);
}
}
private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
sessions.put(edgeId, edgeGrpcSession);
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true);

14
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -192,6 +192,20 @@ public final class EdgeGrpcSession implements Closeable {
};
}
void onConfigurationUpdate(Edge edge) {
try {
this.edge = edge;
// TODO: voba - push edge configuration update to edge
// outputStream.onNext(org.thingsboard.server.gen.integration.ResponseMsg.newBuilder()
// .setIntegrationUpdateMsg(IntegrationUpdateMsg.newBuilder()
// .setConfiguration(constructIntegrationConfigProto(configuration, defaultConverterProto, downLinkConverterProto))
// .build())
// .build());
} catch (Exception e) {
log.error("Failed to construct proto objects!", e);
}
}
void processHandleMessages() throws ExecutionException, InterruptedException {
Long queueStartTs = getQueueStartTs().get();
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true);

41
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java

@ -0,0 +1,41 @@
/**
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
*
* Copyright © 2016-2020 ThingsBoard, Inc. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of ThingsBoard, Inc. and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to ThingsBoard, Inc.
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
*
* Dissemination of this information or reproduction of this material is strictly forbidden
* unless prior written permission is obtained from COMPANY.
*
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
* explicitly covering such access.
*
* The copyright notice above does not evidence any actual or intended publication
* or disclosure of this source code, which includes
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
*/
package org.thingsboard.server.service.edge.rpc;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
public interface EdgeRpcService {
void updateEdge(Edge edge);
void deleteEdge(EdgeId edgeId);
}
Loading…
Cancel
Save