From a770a59bf0f9fd2147406602be19a664d7643e2e Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 29 Oct 2019 19:21:53 +0200 Subject: [PATCH] Added Edge gRPC session --- .../service/edge/EdgeContextComponent.java | 14 ++ .../service/edge/rpc/EdgeGrpcService.java | 42 +++--- .../service/edge/rpc/EdgeGrpcSession.java | 130 ++++++++++++++++++ .../exception/EdgeConnectionException.java | 29 ++++ .../thingsboard/edge/rpc/EdgeGrpcClient.java | 80 +++++++++-- .../thingsboard/edge/rpc/EdgeRpcClient.java | 19 ++- common/edge-api/src/main/proto/edge.proto | 77 +++++++++-- 7 files changed, 347 insertions(+), 44 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java create mode 100644 common/edge-api/src/main/java/org/thingsboard/edge/exception/EdgeConnectionException.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java new file mode 100644 index 0000000000..377e12fbc9 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -0,0 +1,14 @@ +package org.thingsboard.server.service.edge; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.thingsboard.server.dao.edge.EdgeService; + +@Component +@Data +public class EdgeContextComponent { + + @Autowired + private EdgeService edgeService; +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 0844b2117b..e80e3115ef 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,22 +20,30 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.edge.gen.EdgeProtos; -import org.thingsboard.server.common.edge.gen.EdgeRpcServiceGrpc; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; +import org.thingsboard.server.gen.edge.RequestMsg; +import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.service.edge.EdgeContextComponent; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.File; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @Service @Slf4j @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true") public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { + private final Map sessions = new ConcurrentHashMap<>(); + @Value("${edges.rpc.port}") private int rpcPort; @Value("${edges.rpc.ssl.enabled}") @@ -45,6 +53,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { @Value("${edges.rpc.ssl.privateKey}") private String privateKeyResource; + @Autowired + private EdgeContextComponent ctx; + private Server server; @PostConstruct @@ -81,24 +92,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { } @Override - public StreamObserver sendUplink(StreamObserver responseObserver) { - log.info("sendUplink [{}]", responseObserver); - return new StreamObserver() { - - @Override - public void onNext(EdgeProtos.UplinkMsg uplinkMsg) { - log.info("onNext [{}]", uplinkMsg); - } + public StreamObserver handleMsgs(StreamObserver responseObserver) { + return new EdgeGrpcSession(ctx, responseObserver, this::onEdgeConnect, this::onEdgeDisconnect).getInputStream(); + } - @Override - public void onError(Throwable throwable) { - log.info("onError", throwable); - } + private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) { + sessions.put(edgeId, edgeGrpcSession); + } - @Override - public void onCompleted() { - log.info("onCompleted"); - } - }; + private void onEdgeDisconnect(EdgeId edgeId) { + sessions.remove(edgeId); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java new file mode 100644 index 0000000000..db395cbe83 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -0,0 +1,130 @@ +package org.thingsboard.server.service.edge.rpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.grpc.stub.StreamObserver; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.ConnectRequestMsg; +import org.thingsboard.server.gen.edge.ConnectResponseCode; +import org.thingsboard.server.gen.edge.ConnectResponseMsg; +import org.thingsboard.server.gen.edge.EdgeConfigurationProto; +import org.thingsboard.server.gen.edge.RequestMsg; +import org.thingsboard.server.gen.edge.RequestMsgType; +import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.gen.edge.UplinkMsg; +import org.thingsboard.server.gen.edge.UplinkResponseMsg; +import org.thingsboard.server.service.edge.EdgeContextComponent; + +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +@Slf4j +@Data +public final class EdgeGrpcSession implements Cloneable { + + private final UUID sessionId; + private final BiConsumer sessionOpenListener; + private final Consumer sessionCloseListener; + + private EdgeContextComponent ctx; + private Edge edge; + private StreamObserver inputStream; + private StreamObserver outputStream; + private boolean connected; + + EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream + , BiConsumer sessionOpenListener + , Consumer sessionCloseListener) { + this.sessionId = UUID.randomUUID(); + this.ctx = ctx; + this.outputStream = outputStream; + this.sessionOpenListener = sessionOpenListener; + this.sessionCloseListener = sessionCloseListener; + initInputStream(); + } + + private void initInputStream() { + this.inputStream = new StreamObserver() { + @Override + public void onNext(RequestMsg requestMsg) { + if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) { + ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg()); + outputStream.onNext(ResponseMsg.newBuilder() + .setConnectResponseMsg(responseMsg) + .build()); + if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) { + outputStream.onError(new RuntimeException(responseMsg.getErrorMsg())); + } + } + if (connected) { + if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasUplinkMsg()) { + outputStream.onNext(ResponseMsg.newBuilder() + .setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg())) + .build()); + } + } + } + + @Override + public void onError(Throwable t) { + log.error("Failed to deliver message from client!", t); + } + + @Override + public void onCompleted() { + sessionCloseListener.accept(edge.getId()); + outputStream.onCompleted(); + } + }; + } + + private UplinkResponseMsg processUplinkMsg(UplinkMsg uplinkMsg) { + return null; + } + + private ConnectResponseMsg processConnect(ConnectRequestMsg request) { + Optional optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey()); + if (optional.isPresent()) { + edge = optional.get(); + try { + if (edge.getSecret().equals(request.getEdgeSecret())) { + connected = true; + sessionOpenListener.accept(edge.getId(), this); + return ConnectResponseMsg.newBuilder() + .setResponseCode(ConnectResponseCode.ACCEPTED) + .setErrorMsg("") + .setConfiguration(constructEdgeConfigProto(edge)).build(); + } + return ConnectResponseMsg.newBuilder() + .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS) + .setErrorMsg("Failed to validate the edge!") + .setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build(); + } catch (Exception e) { + log.error("[{}] Failed to process edge connection!", request.getEdgeRoutingKey(), e); + return ConnectResponseMsg.newBuilder() + .setResponseCode(ConnectResponseCode.SERVER_UNAVAILABLE) + .setErrorMsg("Failed to process edge connection!") + .setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build(); + } + } + return ConnectResponseMsg.newBuilder() + .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS) + .setErrorMsg("Failed to find the edge! Routing key: " + request.getEdgeRoutingKey()) + .setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build(); + } + + private EdgeConfigurationProto constructEdgeConfigProto(Edge edge) throws JsonProcessingException { + return EdgeConfigurationProto.newBuilder() + .setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits()) + .setName(edge.getName()) + .setRoutingKey(edge.getRoutingKey()) + .setType(edge.getType().toString()) + .build(); + } +} diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/exception/EdgeConnectionException.java b/common/edge-api/src/main/java/org/thingsboard/edge/exception/EdgeConnectionException.java new file mode 100644 index 0000000000..816c878136 --- /dev/null +++ b/common/edge-api/src/main/java/org/thingsboard/edge/exception/EdgeConnectionException.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.edge.exception; + +public class EdgeConnectionException extends RuntimeException { + + private static final long serialVersionUID = -4372754681230555723L; + + public EdgeConnectionException(String message) { + super(message); + } + + public EdgeConnectionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index 999a8a0e0f..fd18407b5c 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -23,12 +23,24 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.edge.gen.EdgeProtos; -import org.thingsboard.server.common.edge.gen.EdgeRpcServiceGrpc; +import org.thingsboard.edge.exception.EdgeConnectionException; +import org.thingsboard.server.gen.edge.CloudDownlinkDataProto; +import org.thingsboard.server.gen.edge.ConnectRequestMsg; +import org.thingsboard.server.gen.edge.ConnectResponseCode; +import org.thingsboard.server.gen.edge.ConnectResponseMsg; +import org.thingsboard.server.gen.edge.EdgeConfigurationProto; +import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; +import org.thingsboard.server.gen.edge.RequestMsg; +import org.thingsboard.server.gen.edge.RequestMsgType; +import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.gen.edge.UplinkMsg; +import org.thingsboard.server.gen.edge.UplinkResponseMsg; import javax.net.ssl.SSLException; import java.io.File; import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; @Service @Slf4j @@ -47,10 +59,15 @@ public class EdgeGrpcClient implements EdgeRpcClient { private ManagedChannel channel; - private StreamObserver inputStream; + private StreamObserver inputStream; @Override - public void connect() { + public void connect(String edgeKey, + String edgeSecret, + Consumer onUplinkResponse, + Consumer onEdgeUpdate, + Consumer onDownlink, + Consumer onError) { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext(); if (sslEnabled) { try { @@ -62,23 +79,62 @@ public class EdgeGrpcClient implements EdgeRpcClient { } channel = builder.build(); EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel); - StreamObserver responseObserver = new StreamObserver() { + log.info("[{}] Sending a connect request to the TB!", edgeKey); + this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDownlink, onError)); + this.inputStream.onNext(RequestMsg.newBuilder() + .setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE) + .setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build()) + .build()); + } + + @Override + public void disconnect() throws InterruptedException { + inputStream.onCompleted(); + if (channel != null) { + channel.shutdown().awaitTermination(timeoutSecs, TimeUnit.SECONDS); + } + } + + @Override + public void sendUplinkMsg(UplinkMsg msg) { + this.inputStream.onNext(RequestMsg.newBuilder() + .setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE) + .setUplinkMsg(msg) + .build()); + } + + private StreamObserver initOutputStream(String edgeKey, Consumer onUplinkResponse, Consumer onEdgeUpdate, Consumer onDownlink, Consumer onError) { + return new StreamObserver() { @Override - public void onNext(EdgeProtos.DownlinkMsg downlinkMsg) { - log.info("onNext [{}]", downlinkMsg); + public void onNext(ResponseMsg responseMsg) { + if (responseMsg.hasConnectResponseMsg()) { + ConnectResponseMsg connectResponseMsg = responseMsg.getConnectResponseMsg(); + if (connectResponseMsg.getResponseCode().equals(ConnectResponseCode.ACCEPTED)) { + log.info("[{}] Configuration received: {}", edgeKey, connectResponseMsg.getConfiguration()); + onEdgeUpdate.accept(connectResponseMsg.getConfiguration()); + } else { + log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", edgeKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg()); + onError.accept(new EdgeConnectionException("Failed to establish the connection! Response code: " + connectResponseMsg.getResponseCode().name())); + } + } else if (responseMsg.hasUplinkResponseMsg()) { + log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg()); + onUplinkResponse.accept(responseMsg.getUplinkResponseMsg()); + } else if (responseMsg.hasDownlinkMsg()) { + log.debug("[{}] Downlink message received for device {}", edgeKey, responseMsg.getDownlinkMsg().getCloudData().getDeviceName()); + onDownlink.accept(responseMsg.getDownlinkMsg().getCloudData()); + } } @Override - public void onError(Throwable throwable) { - + public void onError(Throwable t) { + log.debug("[{}] The rpc session received an error!", edgeKey, t); + onError.accept(new RuntimeException(t)); } @Override public void onCompleted() { - + log.debug("[{}] The rpc session was closed!", edgeKey); } }; - inputStream = stub.sendUplink(responseObserver); - inputStream.onNext(EdgeProtos.UplinkMsg.newBuilder().setMsgType(EdgeProtos.UplinkMsgType.DELETE_DEVICE_MESSAGE).build()); } } diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java index f627330e80..95e0dbdfd6 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java @@ -15,7 +15,24 @@ */ package org.thingsboard.edge.rpc; +import org.thingsboard.server.gen.edge.CloudDownlinkDataProto; +import org.thingsboard.server.gen.edge.EdgeConfigurationProto; +import org.thingsboard.server.gen.edge.UplinkMsg; +import org.thingsboard.server.gen.edge.UplinkResponseMsg; + +import java.util.function.Consumer; + public interface EdgeRpcClient { - void connect(); + void connect(String integrationKey, + String integrationSecret, + Consumer onUplinkResponse, + Consumer onEdgeUpdate, + Consumer onDownlink, + Consumer onError); + + + void disconnect() throws InterruptedException; + + void sendUplinkMsg(UplinkMsg uplinkMsg) throws InterruptedException; } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index f86fa96b4a..a2d3ce82f3 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -15,35 +15,90 @@ */ syntax = "proto3"; -option java_package = "org.thingsboard.server.common.edge.gen"; +option java_package = "org.thingsboard.server.gen.edge"; +option java_multiple_files = true; option java_outer_classname = "EdgeProtos"; package edge; -// Interface exported by the ThingsBoard PRC Edge. +// Interface exported by the ThingsBoard Edge Transport. service EdgeRpcService { - rpc sendUplink(stream UplinkMsg) returns (stream DownlinkMsg) {} + rpc handleMsgs(stream RequestMsg) returns (stream ResponseMsg) {} } /** * Data Structures; */ -message UplinkMsg { - UplinkMsgType msgType = 1; +message RequestMsg { + RequestMsgType msgType = 1; + ConnectRequestMsg connectRequestMsg = 2; + UplinkMsg uplinkMsg = 3; } -message DownlinkMsg { - DownlinkMsgType msgType = 1; +message ResponseMsg { + ResponseMsgType msgType = 1; + ConnectResponseMsg connectResponseMsg = 2; + UplinkResponseMsg uplinkResponseMsg = 3; + DownlinkMsg downlinkMsg = 4; +} + + +enum RequestMsgType { + CONNECT_RPC_MESSAGE = 0; + UPLINK_RPC_MESSAGE = 1; +} + +message ConnectRequestMsg { + string edgeRoutingKey = 1; + string edgeSecret = 2; +} + +enum ConnectResponseCode { + ACCEPTED = 0; + BAD_CREDENTIALS = 1; + SERVER_UNAVAILABLE = 2; } -enum UplinkMsgType { - SAVE_DEVICE_MESSAGE = 0; - DELETE_DEVICE_MESSAGE = 1; +message ConnectResponseMsg { + ConnectResponseCode responseCode = 1; + string errorMsg = 2; + EdgeConfigurationProto configuration = 3; } -enum DownlinkMsgType { +message EdgeConfigurationProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + string name = 5; + string routingKey = 6; + string type = 7; +} + +enum ResponseMsgType { SAVE_ENTITY_MESSAGE = 0; DELETE_ENTITY_MESSAGE = 1; } + +message CloudDownlinkDataProto { + string deviceName = 1; + string deviceType = 2; + bytes tbMsg = 3; +} + +/** + * Main Messages; + */ + +message UplinkMsg { + int32 uplinkMsgId = 1; +} + +message UplinkResponseMsg { + bool success = 1; + string errorMsg = 2; +} + +message DownlinkMsg { + CloudDownlinkDataProto cloudData = 1; +}