Browse Source

Added Edge gRPC session

pull/2818/head
Volodymyr Babak 7 years ago
parent
commit
a770a59bf0
  1. 14
      application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java
  2. 42
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
  3. 130
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  4. 29
      common/edge-api/src/main/java/org/thingsboard/edge/exception/EdgeConnectionException.java
  5. 80
      common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java
  6. 19
      common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java
  7. 77
      common/edge-api/src/main/proto/edge.proto

14
application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java

@ -0,0 +1,14 @@
package org.thingsboard.server.service.edge;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.server.dao.edge.EdgeService;
@Component
@Data
public class EdgeContextComponent {
@Autowired
private EdgeService edgeService;
}

42
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -20,22 +20,30 @@ import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.edge.gen.EdgeProtos;
import org.thingsboard.server.common.edge.gen.EdgeRpcServiceGrpc;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
@ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true")
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
@Value("${edges.rpc.port}")
private int rpcPort;
@Value("${edges.rpc.ssl.enabled}")
@ -45,6 +53,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
@Value("${edges.rpc.ssl.privateKey}")
private String privateKeyResource;
@Autowired
private EdgeContextComponent ctx;
private Server server;
@PostConstruct
@ -81,24 +92,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
}
@Override
public StreamObserver<EdgeProtos.UplinkMsg> sendUplink(StreamObserver<EdgeProtos.DownlinkMsg> responseObserver) {
log.info("sendUplink [{}]", responseObserver);
return new StreamObserver<EdgeProtos.UplinkMsg>() {
@Override
public void onNext(EdgeProtos.UplinkMsg uplinkMsg) {
log.info("onNext [{}]", uplinkMsg);
}
public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> responseObserver) {
return new EdgeGrpcSession(ctx, responseObserver, this::onEdgeConnect, this::onEdgeDisconnect).getInputStream();
}
@Override
public void onError(Throwable throwable) {
log.info("onError", throwable);
}
private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
sessions.put(edgeId, edgeGrpcSession);
}
@Override
public void onCompleted() {
log.info("onCompleted");
}
};
private void onEdgeDisconnect(EdgeId edgeId) {
sessions.remove(edgeId);
}
}

130
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -0,0 +1,130 @@
package org.thingsboard.server.service.edge.rpc;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.ConnectResponseCode;
import org.thingsboard.server.gen.edge.ConnectResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfigurationProto;
import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.RequestMsgType;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@Slf4j
@Data
public final class EdgeGrpcSession implements Cloneable {
private final UUID sessionId;
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
private final Consumer<EdgeId> sessionCloseListener;
private EdgeContextComponent ctx;
private Edge edge;
private StreamObserver<RequestMsg> inputStream;
private StreamObserver<ResponseMsg> outputStream;
private boolean connected;
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream
, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener
, Consumer<EdgeId> sessionCloseListener) {
this.sessionId = UUID.randomUUID();
this.ctx = ctx;
this.outputStream = outputStream;
this.sessionOpenListener = sessionOpenListener;
this.sessionCloseListener = sessionCloseListener;
initInputStream();
}
private void initInputStream() {
this.inputStream = new StreamObserver<RequestMsg>() {
@Override
public void onNext(RequestMsg requestMsg) {
if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) {
ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg());
outputStream.onNext(ResponseMsg.newBuilder()
.setConnectResponseMsg(responseMsg)
.build());
if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
}
}
if (connected) {
if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasUplinkMsg()) {
outputStream.onNext(ResponseMsg.newBuilder()
.setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg()))
.build());
}
}
}
@Override
public void onError(Throwable t) {
log.error("Failed to deliver message from client!", t);
}
@Override
public void onCompleted() {
sessionCloseListener.accept(edge.getId());
outputStream.onCompleted();
}
};
}
private UplinkResponseMsg processUplinkMsg(UplinkMsg uplinkMsg) {
return null;
}
private ConnectResponseMsg processConnect(ConnectRequestMsg request) {
Optional<Edge> optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey());
if (optional.isPresent()) {
edge = optional.get();
try {
if (edge.getSecret().equals(request.getEdgeSecret())) {
connected = true;
sessionOpenListener.accept(edge.getId(), this);
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.ACCEPTED)
.setErrorMsg("")
.setConfiguration(constructEdgeConfigProto(edge)).build();
}
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
.setErrorMsg("Failed to validate the edge!")
.setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build();
} catch (Exception e) {
log.error("[{}] Failed to process edge connection!", request.getEdgeRoutingKey(), e);
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.SERVER_UNAVAILABLE)
.setErrorMsg("Failed to process edge connection!")
.setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build();
}
}
return ConnectResponseMsg.newBuilder()
.setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
.setErrorMsg("Failed to find the edge! Routing key: " + request.getEdgeRoutingKey())
.setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build();
}
private EdgeConfigurationProto constructEdgeConfigProto(Edge edge) throws JsonProcessingException {
return EdgeConfigurationProto.newBuilder()
.setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits())
.setName(edge.getName())
.setRoutingKey(edge.getRoutingKey())
.setType(edge.getType().toString())
.build();
}
}

29
common/edge-api/src/main/java/org/thingsboard/edge/exception/EdgeConnectionException.java

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.edge.exception;
public class EdgeConnectionException extends RuntimeException {
private static final long serialVersionUID = -4372754681230555723L;
public EdgeConnectionException(String message) {
super(message);
}
public EdgeConnectionException(String message, Throwable cause) {
super(message, cause);
}
}

80
common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java

@ -23,12 +23,24 @@ import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.edge.gen.EdgeProtos;
import org.thingsboard.server.common.edge.gen.EdgeRpcServiceGrpc;
import org.thingsboard.edge.exception.EdgeConnectionException;
import org.thingsboard.server.gen.edge.CloudDownlinkDataProto;
import org.thingsboard.server.gen.edge.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.ConnectResponseCode;
import org.thingsboard.server.gen.edge.ConnectResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfigurationProto;
import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.RequestMsgType;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import javax.net.ssl.SSLException;
import java.io.File;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Service
@Slf4j
@ -47,10 +59,15 @@ public class EdgeGrpcClient implements EdgeRpcClient {
private ManagedChannel channel;
private StreamObserver<EdgeProtos.UplinkMsg> inputStream;
private StreamObserver<RequestMsg> inputStream;
@Override
public void connect() {
public void connect(String edgeKey,
String edgeSecret,
Consumer<UplinkResponseMsg> onUplinkResponse,
Consumer<EdgeConfigurationProto> onEdgeUpdate,
Consumer<CloudDownlinkDataProto> onDownlink,
Consumer<Exception> onError) {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext();
if (sslEnabled) {
try {
@ -62,23 +79,62 @@ public class EdgeGrpcClient implements EdgeRpcClient {
}
channel = builder.build();
EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel);
StreamObserver<EdgeProtos.DownlinkMsg> responseObserver = new StreamObserver<EdgeProtos.DownlinkMsg>() {
log.info("[{}] Sending a connect request to the TB!", edgeKey);
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDownlink, onError));
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE)
.setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build())
.build());
}
@Override
public void disconnect() throws InterruptedException {
inputStream.onCompleted();
if (channel != null) {
channel.shutdown().awaitTermination(timeoutSecs, TimeUnit.SECONDS);
}
}
@Override
public void sendUplinkMsg(UplinkMsg msg) {
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE)
.setUplinkMsg(msg)
.build());
}
private StreamObserver<ResponseMsg> initOutputStream(String edgeKey, Consumer<UplinkResponseMsg> onUplinkResponse, Consumer<EdgeConfigurationProto> onEdgeUpdate, Consumer<CloudDownlinkDataProto> onDownlink, Consumer<Exception> onError) {
return new StreamObserver<ResponseMsg>() {
@Override
public void onNext(EdgeProtos.DownlinkMsg downlinkMsg) {
log.info("onNext [{}]", downlinkMsg);
public void onNext(ResponseMsg responseMsg) {
if (responseMsg.hasConnectResponseMsg()) {
ConnectResponseMsg connectResponseMsg = responseMsg.getConnectResponseMsg();
if (connectResponseMsg.getResponseCode().equals(ConnectResponseCode.ACCEPTED)) {
log.info("[{}] Configuration received: {}", edgeKey, connectResponseMsg.getConfiguration());
onEdgeUpdate.accept(connectResponseMsg.getConfiguration());
} else {
log.error("[{}] Failed to establish the connection! Code: {}. Error message: {}.", edgeKey, connectResponseMsg.getResponseCode(), connectResponseMsg.getErrorMsg());
onError.accept(new EdgeConnectionException("Failed to establish the connection! Response code: " + connectResponseMsg.getResponseCode().name()));
}
} else if (responseMsg.hasUplinkResponseMsg()) {
log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg());
onUplinkResponse.accept(responseMsg.getUplinkResponseMsg());
} else if (responseMsg.hasDownlinkMsg()) {
log.debug("[{}] Downlink message received for device {}", edgeKey, responseMsg.getDownlinkMsg().getCloudData().getDeviceName());
onDownlink.accept(responseMsg.getDownlinkMsg().getCloudData());
}
}
@Override
public void onError(Throwable throwable) {
public void onError(Throwable t) {
log.debug("[{}] The rpc session received an error!", edgeKey, t);
onError.accept(new RuntimeException(t));
}
@Override
public void onCompleted() {
log.debug("[{}] The rpc session was closed!", edgeKey);
}
};
inputStream = stub.sendUplink(responseObserver);
inputStream.onNext(EdgeProtos.UplinkMsg.newBuilder().setMsgType(EdgeProtos.UplinkMsgType.DELETE_DEVICE_MESSAGE).build());
}
}

19
common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java

@ -15,7 +15,24 @@
*/
package org.thingsboard.edge.rpc;
import org.thingsboard.server.gen.edge.CloudDownlinkDataProto;
import org.thingsboard.server.gen.edge.EdgeConfigurationProto;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import java.util.function.Consumer;
public interface EdgeRpcClient {
void connect();
void connect(String integrationKey,
String integrationSecret,
Consumer<UplinkResponseMsg> onUplinkResponse,
Consumer<EdgeConfigurationProto> onEdgeUpdate,
Consumer<CloudDownlinkDataProto> onDownlink,
Consumer<Exception> onError);
void disconnect() throws InterruptedException;
void sendUplinkMsg(UplinkMsg uplinkMsg) throws InterruptedException;
}

77
common/edge-api/src/main/proto/edge.proto

@ -15,35 +15,90 @@
*/
syntax = "proto3";
option java_package = "org.thingsboard.server.common.edge.gen";
option java_package = "org.thingsboard.server.gen.edge";
option java_multiple_files = true;
option java_outer_classname = "EdgeProtos";
package edge;
// Interface exported by the ThingsBoard PRC Edge.
// Interface exported by the ThingsBoard Edge Transport.
service EdgeRpcService {
rpc sendUplink(stream UplinkMsg) returns (stream DownlinkMsg) {}
rpc handleMsgs(stream RequestMsg) returns (stream ResponseMsg) {}
}
/**
* Data Structures;
*/
message UplinkMsg {
UplinkMsgType msgType = 1;
message RequestMsg {
RequestMsgType msgType = 1;
ConnectRequestMsg connectRequestMsg = 2;
UplinkMsg uplinkMsg = 3;
}
message DownlinkMsg {
DownlinkMsgType msgType = 1;
message ResponseMsg {
ResponseMsgType msgType = 1;
ConnectResponseMsg connectResponseMsg = 2;
UplinkResponseMsg uplinkResponseMsg = 3;
DownlinkMsg downlinkMsg = 4;
}
enum RequestMsgType {
CONNECT_RPC_MESSAGE = 0;
UPLINK_RPC_MESSAGE = 1;
}
message ConnectRequestMsg {
string edgeRoutingKey = 1;
string edgeSecret = 2;
}
enum ConnectResponseCode {
ACCEPTED = 0;
BAD_CREDENTIALS = 1;
SERVER_UNAVAILABLE = 2;
}
enum UplinkMsgType {
SAVE_DEVICE_MESSAGE = 0;
DELETE_DEVICE_MESSAGE = 1;
message ConnectResponseMsg {
ConnectResponseCode responseCode = 1;
string errorMsg = 2;
EdgeConfigurationProto configuration = 3;
}
enum DownlinkMsgType {
message EdgeConfigurationProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string name = 5;
string routingKey = 6;
string type = 7;
}
enum ResponseMsgType {
SAVE_ENTITY_MESSAGE = 0;
DELETE_ENTITY_MESSAGE = 1;
}
message CloudDownlinkDataProto {
string deviceName = 1;
string deviceType = 2;
bytes tbMsg = 3;
}
/**
* Main Messages;
*/
message UplinkMsg {
int32 uplinkMsgId = 1;
}
message UplinkResponseMsg {
bool success = 1;
string errorMsg = 2;
}
message DownlinkMsg {
CloudDownlinkDataProto cloudData = 1;
}

Loading…
Cancel
Save