diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 7abcca2cab..c590104665 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,7 +32,6 @@ import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -44,18 +43,33 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; import org.thingsboard.server.common.msg.core.RuleEngineError; import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg; -import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; -import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.common.msg.session.SessionType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; @@ -76,8 +90,6 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.thingsboard.server.gen.transport.TransportProtos.*; - /** * @author Andrew Shvayka */ @@ -123,11 +135,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); ToDeviceRpcRequestBody body = request.getBody(); - ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg( - rpcSeq++, - body.getMethod(), - body.getParams() - ); + ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId( + rpcSeq++).setMethodName(body.getMethod()).setParams(body.getParams()).build(); long timeout = request.getExpirationTime() - System.currentTimeMillis(); if (timeout <= 0) { @@ -136,13 +145,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } boolean sent = rpcSubscriptions.size() > 0; - Set syncSessionSet = new HashSet<>(); + Set syncSessionSet = new HashSet<>(); rpcSubscriptions.entrySet().forEach(sub -> { -// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey()); -// sendMsgToSessionActor(response, sub.getValue().getServer()); -// if (SessionType.SYNC == sub.getValue().getType()) { -// syncSessionSet.add(sub.getKey()); -// } + sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId()); + if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) { + syncSessionSet.add(sub.getKey()); + } }); syncSessionSet.forEach(rpcSubscriptions::remove); @@ -175,10 +183,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional server) { + private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) { + TransportProtos.SessionType sessionType = getSessionType(sessionId); if (!toDeviceRpcPendingMap.isEmpty()) { logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); - if (type == SessionType.SYNC) { + if (sessionType == TransportProtos.SessionType.SYNC) { logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } @@ -186,16 +195,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } Set sentOneWayIds = new HashSet<>(); - if (type == SessionType.ASYNC) { - toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds)); + if (sessionType == TransportProtos.SessionType.ASYNC) { + toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds)); } else { - toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds)); + toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds)); } sentOneWayIds.forEach(toDeviceRpcPendingMap::remove); } - private Consumer> processPendingRpc(ActorContext context, SessionId sessionId, Optional server, Set sentOneWayIds) { + private Consumer> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set sentOneWayIds) { return entry -> { ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg(); ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); @@ -204,19 +213,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso sentOneWayIds.add(entry.getKey()); systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null)); } - ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg( - entry.getKey(), - body.getMethod(), - body.getParams() - ); -// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId); -// sendMsgToSessionActor(response, server); + ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId( + entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build(); + sendToTransport(rpcRequest, sessionId, nodeId); }; } void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) { TransportToDeviceActorMsg msg = wrapper.getMsg(); -// processRpcResponses(context, msg); if (msg.hasSessionEvent()) { processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); } @@ -237,15 +241,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (msg.hasGetAttributes()) { handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); } -// SessionMsgType sessionMsgType = msg.getPayload().getMsgType(); -// if (sessionMsgType.requiresRulesProcessing()) { -// switch (sessionMsgType) { -// case TO_SERVER_RPC_REQUEST: -// handleClientSideRPCRequest(context, msg); -// reportActivity(); -// break; -// } -// } + if (msg.hasToDeviceRPCCallResponse()) { + processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse()); + } + if (msg.hasToServerRPCCallRequest()) { + handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest()); + reportActivity(); + } } private void reportActivity() { @@ -314,36 +316,42 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } -// private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) { -// ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload(); -// -// JsonObject json = new JsonObject(); -// json.addProperty("method", request.getMethod()); -// json.add("params", jsonParser.parse(request.getParams())); -// -// TbMsgMetaData requestMetaData = defaultMetaData.copy(); -// requestMetaData.putValue("requestId", Integer.toString(request.getRequestId())); -// TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L); -// PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1); -// pushToRuleEngineWithTimeout(context, tbMsg, msgData); -// -// scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout()); -// toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress())); -// } + private void handleClientSideRPCRequest(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg request) { + UUID sessionId = getSessionId(sessionInfo); + JsonObject json = new JsonObject(); + json.addProperty("method", request.getMethodName()); + json.add("params", jsonParser.parse(request.getParams())); + + TbMsgMetaData requestMetaData = defaultMetaData.copy(); + requestMetaData.putValue("requestId", Integer.toString(request.getRequestId())); + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L); + context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self()); + + scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout()); + toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(sessionId, getSessionType(sessionId), sessionInfo.getNodeId())); + } - public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) { + private TransportProtos.SessionType getSessionType(UUID sessionId) { + return sessions.containsKey(sessionId) ? TransportProtos.SessionType.ASYNC : TransportProtos.SessionType.SYNC; + } + + void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) { ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId()); if (data != null) { logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId()); - ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT); -// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer()); + sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder() + .setRequestId(msg.getId()).setError("timeout").build() + , data.getSessionId(), data.getNodeId()); } } void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) { - ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId()); + int requestId = msg.getMsg().getRequestId(); + ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); if (data != null) { -// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer()); + sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder() + .setRequestId(requestId).setPayload(msg.getMsg().getData()).build() + , data.getSessionId(), data.getNodeId()); } } @@ -382,7 +390,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (hasNotificationData) { AttributeUpdateNotificationMsg finalNotification = notification.build(); attributeSubscriptions.entrySet().forEach(sub -> { - sendToTransport(finalNotification, sub.getKey(), sub.getValue()); + sendToTransport(finalNotification, sub.getKey(), sub.getValue().getNodeId()); }); } } else { @@ -390,6 +398,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } + private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) { + UUID sessionId = getSessionId(sessionInfo); + logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId); + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); + boolean success = requestMd != null; + if (success) { + systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), + requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null)); + } else { + logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); + } + } + // private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) { // SessionId sessionId = msg.getSessionId(); // FromDeviceMsg inMsg = msg.getPayload(); @@ -424,7 +445,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { - UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); attributeSubscriptions.remove(sessionId); @@ -438,8 +459,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } + private UUID getSessionId(SessionInfoProto sessionInfo) { + return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + } + private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) { - UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); @@ -450,11 +475,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); rpcSubscriptions.put(sessionId, session); + sendPendingRequests(context, sessionId, sessionInfo); } } private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { - UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + UUID sessionId = getSessionId(sessionInfo); if (msg.getEvent() == SessionEvent.OPEN) { logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { @@ -548,14 +574,31 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); } - private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) { + private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) { DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setAttributeUpdateNotification(notificationMsg).build(); - systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); + systemContext.getRuleEngineTransportService().process(nodeId, msg); } + private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) { + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .setToDeviceRequest(rpcMsg).build(); + systemContext.getRuleEngineTransportService().process(nodeId, msg); + } + + private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) { + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .setToServerResponse(rpcMsg).build(); + systemContext.getRuleEngineTransportService().process(nodeId, msg); + } + + private List toTsKvProtos(@Nullable List result) { List clientAttributes; if (result == null || result.isEmpty()) { diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java deleted file mode 100644 index dfa07cf065..0000000000 --- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.actors.device; - -import lombok.AllArgsConstructor; -import lombok.Data; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.session.SessionMsgType; - -import java.util.Optional; -import java.util.UUID; - -/** - * Created by ashvayka on 17.04.18. - */ -@Data -@AllArgsConstructor -public final class PendingSessionMsgData { - - private final UUID sessionId; - private final Optional serverAddress; - private final SessionMsgType sessionMsgType; - private final int requestId; - private final boolean replyOnQueueAck; - private int ackMsgCount; - -} diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java index f82a8c2481..669d94b17d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java @@ -16,18 +16,16 @@ package org.thingsboard.server.actors.device; import lombok.Data; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.session.SessionType; +import org.thingsboard.server.gen.transport.TransportProtos; -import java.util.Optional; +import java.util.UUID; /** * @author Andrew Shvayka */ @Data public class ToServerRpcRequestMetadata { - private final SessionId sessionId; - private final SessionType type; - private final Optional server; + private final UUID sessionId; + private final TransportProtos.SessionType type; + private final String nodeId; } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java index 17d4a1f42c..09d1bcb08f 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,8 +15,11 @@ */ package org.thingsboard.server.common.transport; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; /** * Created by ashvayka on 04.10.18. @@ -26,4 +29,10 @@ public interface SessionMsgListener { void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse); void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification); + + void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification); + + void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest); + + void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java index bf33b1d8a5..cd3041939b 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.transport; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; @@ -49,6 +51,10 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback callback); + + void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); + void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener); void deregisterSession(SessionInfoProto sessionInfo); diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java index 872dfaa03e..db7ee884dc 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.google.gson.JsonSyntaxException; +import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -95,6 +96,15 @@ public class JsonConverter { } } + public static JsonElement toJson(TransportProtos.ToDeviceRpcRequestMsg msg, boolean includeRequestId) { + JsonObject result = new JsonObject(); + if (includeRequestId) { + result.addProperty("id", msg.getRequestId()); + } + result.addProperty("method", msg.getMethodName()); + result.add("params", new JsonParser().parse(msg.getParams())); + return result; + } private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) { JsonObject jo = jsonObject.getAsJsonObject(); @@ -112,14 +122,14 @@ public class JsonConverter { request.addTsKvList(builder.build()); } - public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) { + private static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) { TsKvListProto.Builder builder = TsKvListProto.newBuilder(); builder.setTs(jo.get("ts").getAsLong()); builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject())); request.addTsKvList(builder.build()); } - public static List parseProtoValues(JsonObject valuesObject) { + private static List parseProtoValues(JsonObject valuesObject) { List result = new ArrayList<>(); for (Entry valueEntry : valuesObject.entrySet()) { JsonElement element = valueEntry.getValue(); @@ -172,9 +182,9 @@ public class JsonConverter { return request; } - public static ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException { + public static TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException { JsonObject object = json.getAsJsonObject(); - return new ToServerRpcRequestMsg(requestId, object.get("method").getAsString(), GSON.toJson(object.get("params"))); + return TransportProtos.ToServerRpcRequestMsg.newBuilder().setRequestId(requestId).setMethodName(object.get("method").getAsString()).setParams(GSON.toJson(object.get("params"))).build(); } private static void parseObject(BasicTelemetryUploadRequest request, long systemTs, JsonElement jsonObject) { @@ -368,8 +378,14 @@ public class JsonConverter { return result; } - public static JsonElement toJson(ToServerRpcResponseMsg msg) { - return new JsonParser().parse(msg.getData()); + public static JsonElement toJson(TransportProtos.ToServerRpcResponseMsg msg) { + if (StringUtils.isEmpty(msg.getError())) { + return new JsonParser().parse(msg.getPayload()); + } else { + JsonObject errorMsg = new JsonObject(); + errorMsg.addProperty("error", msg.getError()); + return errorMsg; + } } public static JsonElement toErrorJson(String errorMsg) { diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto index 66c0d8183a..bcf5f529b9 100644 --- a/common/transport/src/main/proto/transport.proto +++ b/common/transport/src/main/proto/transport.proto @@ -137,6 +137,29 @@ message SubscribeToRPCMsg { bool unsubscribe = 1; } +message ToDeviceRpcRequestMsg { + int32 requestId = 1; + string methodName = 2; + string params = 3; +} + +message ToDeviceRpcResponseMsg { + int32 requestId = 1; + string payload = 2; +} + +message ToServerRpcRequestMsg { + int32 requestId = 1; + string methodName = 2; + string params = 3; +} + +message ToServerRpcResponseMsg { + int32 requestId = 1; + string payload = 2; + string error = 3; +} + message TransportToDeviceActorMsg { SessionInfoProto sessionInfo = 1; SessionEventMsg sessionEvent = 2; @@ -144,7 +167,9 @@ message TransportToDeviceActorMsg { PostAttributeMsg postAttributes = 4; GetAttributeRequestMsg getAttributes = 5; SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6; - SubscribeToRPCMsg subscribeToRPC= 7; + SubscribeToRPCMsg subscribeToRPC = 7; + ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8; + ToServerRpcRequestMsg toServerRPCCallRequest = 9; } message DeviceActorToTransportMsg { @@ -153,6 +178,8 @@ message DeviceActorToTransportMsg { SessionCloseNotificationProto sessionCloseNotification = 3; GetAttributeResponseMsg getAttributesResponse = 4; AttributeUpdateNotificationMsg attributeUpdateNotification = 5; + ToDeviceRpcRequestMsg toDeviceRequest = 6; + ToServerRpcResponseMsg toServerResponse = 7; } /** diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java index 1c9631129f..3cd414289c 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java @@ -98,7 +98,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { String payload = validatePayload(ctx, inbound); - return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0); +// return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0); + return null; } @Override @@ -225,8 +226,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { private Response convertToServerRpcResponse(SessionContext ctx, ToServerRpcResponseMsg msg) { if (msg.isSuccess()) { Response response = new Response(ResponseCode.CONTENT); - JsonElement result = JsonConverter.toJson(msg); - response.setPayload(result.toString()); +// JsonElement result = JsonConverter.toJson(msg); +// response.setPayload(result.toString()); return response; } else { return convertError(Optional.of(new RuntimeException("Server RPC response is empty!"))); diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java index e503409b23..5269285d25 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java @@ -114,7 +114,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext { } private void reply(ToServerRpcResponseMsg msg) { - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); } private void reply(AttributesUpdateNotification msg) { diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index b1ee037dc3..3099b24b87 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -223,43 +223,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg); transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)){ + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg); + transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)){ + TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg); + transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); } } catch (AdaptorException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); ctx.close(); } -// AdaptorToSessionActorMsg msg = null; -// try { -// if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { -// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg); -// } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { -// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg); -// } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { -// msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg); -// if (msgId >= 0) { -// ctx.writeAndFlush(createMqttPubAckMsg(msgId)); -// } -// } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) { -// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg); -// if (msgId >= 0) { -// ctx.writeAndFlush(createMqttPubAckMsg(msgId)); -// } -// } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) { -// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg); -// if (msgId >= 0) { -// ctx.writeAndFlush(createMqttPubAckMsg(msgId)); -// } -// } -// } catch (AdaptorException e) { -// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); -// } -// if (msg != null) { -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); -// } else { -// log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); -// ctx.close(); -// } } private TransportServiceCallback getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) { @@ -555,4 +530,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e); } } + + @Override + public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { + log.trace("[{}] Received the remote command to close the session", sessionId); + processDisconnect(deviceSessionCtx.getChannel()); + } + + @Override + public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { + log.trace("[{}] Received RPC command to device", sessionId); + try { + adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } catch (Exception e) { + log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e); + } + } + + @Override + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) { + log.trace("[{}] Received RPC command to device", sessionId); + try { + adaptor.convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } catch (Exception e) { + log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e); + } + } } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index d4ab033cee..a2be169656 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt.adaptors; import com.google.gson.Gson; import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import io.netty.buffer.ByteBuf; @@ -98,6 +97,31 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } + @Override + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { + String topicName = inbound.variableHeader().topicName(); + try { + Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length())); + String payload = inbound.payload().toString(UTF8); + return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build(); + } catch (RuntimeException e) { + log.warn("Failed to decode get attributes request", e); + throw new AdaptorException(e); + } + } + + @Override + public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { + String topicName = inbound.variableHeader().topicName(); + String payload = validatePayload(ctx.getSessionId(), inbound.payload()); + try { + Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length())); + return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId); + } catch (IllegalStateException | JsonSyntaxException ex) { + throw new AdaptorException(ex); + } + } + @Override public Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { if (!StringUtils.isEmpty(responseMsg.getError())) { @@ -115,9 +139,17 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @Override public Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException { - return Optional.of(createMqttPublishMsg(ctx, - MqttTopics.DEVICE_ATTRIBUTES_TOPIC, - JsonConverter.toJson(notificationMsg))); + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg))); + } + + @Override + public Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException { + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false))); + } + + @Override + public Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) { + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse))); } @Override @@ -149,7 +181,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound); break; case TO_SERVER_RPC_REQUEST: - msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound); + msg = null;//convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound); break; default: log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type); @@ -181,13 +213,12 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { break; case TO_DEVICE_RPC_REQUEST: ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg; - result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), - rpcRequest); + result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), rpcRequest); break; case TO_SERVER_RPC_RESPONSE: - ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg; - result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), - rpcResponse); +// ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg; +// result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), +// rpcResponse); break; case RULE_ENGINE_ERROR: RuleEngineErrorMsg errorMsg = (RuleEngineErrorMsg) msg; @@ -232,7 +263,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false)); } - private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) { + private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) { return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg)); } @@ -290,7 +321,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { String payload = validatePayload(ctx.getSessionId(), inbound.payload()); try { - return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId()); + return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId()); } catch (IllegalStateException | JsonSyntaxException ex) { throw new AdaptorException(ex); } @@ -299,18 +330,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { String payload = validatePayload(ctx.getSessionId(), inbound.payload()); try { - return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()); - } catch (IllegalStateException | JsonSyntaxException ex) { - throw new AdaptorException(ex); - } - } - - private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { - String topicName = inbound.variableHeader().topicName(); - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); - try { - Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length())); - return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId); + return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId()); } catch (IllegalStateException | JsonSyntaxException ex) { throw new AdaptorException(ex); } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 602fde12f6..48173f968a 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -19,7 +19,15 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.thingsboard.server.common.transport.TransportAdaptor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import java.util.Optional; @@ -29,15 +37,21 @@ import java.util.Optional; */ public interface MqttTransportAdaptor extends TransportAdaptor { - TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; + PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; - TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; + PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; - TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; + GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; - Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException; + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException; - Optional convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; + ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException; + Optional convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException; + Optional convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; + + Optional convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; + + Optional convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse); } diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java index 648b01c9c3..15392386c4 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -164,6 +164,15 @@ public class MqttTransportService implements TransportService { if (toSessionMsg.hasAttributeUpdateNotification()) { listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification()); } + if (toSessionMsg.hasSessionCloseNotification()) { + listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification()); + } + if (toSessionMsg.hasToDeviceRequest()) { + listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest()); + } + if (toSessionMsg.hasToServerResponse()) { + listener.onToServerRpcResponse(toSessionMsg.getToServerResponse()); + } }); } else { //TODO: should we notify the device actor about missed session? @@ -272,6 +281,24 @@ public class MqttTransportService implements TransportService { send(sessionInfo, toRuleEngineMsg, callback); } + @Override + public void process(SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setToDeviceRPCCallResponse(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + + @Override + public void process(SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setToServerRPCCallRequest(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + @Override public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { sessions.putIfAbsent(toId(sessionInfo), listener);