Browse Source

MQTT API implementation

pull/1166/head
Andrew Shvayka 8 years ago
parent
commit
950ec8d622
  1. 187
      application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
  2. 41
      application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
  3. 12
      application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
  4. 17
      common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java
  5. 6
      common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  6. 36
      common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
  7. 29
      common/transport/src/main/proto/transport.proto
  8. 7
      transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
  9. 2
      transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
  10. 71
      transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
  11. 76
      transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
  12. 26
      transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
  13. 35
      transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java

187
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -32,7 +32,6 @@ import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ -44,18 +43,33 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@ -76,8 +90,6 @@ import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.thingsboard.server.gen.transport.TransportProtos.*;
/**
* @author Andrew Shvayka
*/
@ -123,11 +135,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
rpcSeq++,
body.getMethod(),
body.getParams()
);
ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
rpcSeq++).setMethodName(body.getMethod()).setParams(body.getParams()).build();
long timeout = request.getExpirationTime() - System.currentTimeMillis();
if (timeout <= 0) {
@ -136,13 +145,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
boolean sent = rpcSubscriptions.size() > 0;
Set<SessionId> syncSessionSet = new HashSet<>();
Set<UUID> syncSessionSet = new HashSet<>();
rpcSubscriptions.entrySet().forEach(sub -> {
// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
// sendMsgToSessionActor(response, sub.getValue().getServer());
// if (SessionType.SYNC == sub.getValue().getType()) {
// syncSessionSet.add(sub.getKey());
// }
sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId());
if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) {
syncSessionSet.add(sub.getKey());
}
});
syncSessionSet.forEach(rpcSubscriptions::remove);
@ -175,10 +183,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
TransportProtos.SessionType sessionType = getSessionType(sessionId);
if (!toDeviceRpcPendingMap.isEmpty()) {
logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
if (type == SessionType.SYNC) {
if (sessionType == TransportProtos.SessionType.SYNC) {
logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
}
@ -186,16 +195,16 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
}
Set<Integer> sentOneWayIds = new HashSet<>();
if (type == SessionType.ASYNC) {
toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
if (sessionType == TransportProtos.SessionType.ASYNC) {
toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
} else {
toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
}
sentOneWayIds.forEach(toDeviceRpcPendingMap::remove);
}
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
return entry -> {
ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
@ -204,19 +213,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
sentOneWayIds.add(entry.getKey());
systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
}
ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
entry.getKey(),
body.getMethod(),
body.getParams()
);
// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
// sendMsgToSessionActor(response, server);
ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build();
sendToTransport(rpcRequest, sessionId, nodeId);
};
}
void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
TransportToDeviceActorMsg msg = wrapper.getMsg();
// processRpcResponses(context, msg);
if (msg.hasSessionEvent()) {
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
}
@ -237,15 +241,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
}
// SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
// if (sessionMsgType.requiresRulesProcessing()) {
// switch (sessionMsgType) {
// case TO_SERVER_RPC_REQUEST:
// handleClientSideRPCRequest(context, msg);
// reportActivity();
// break;
// }
// }
if (msg.hasToDeviceRPCCallResponse()) {
processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse());
}
if (msg.hasToServerRPCCallRequest()) {
handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
reportActivity();
}
}
private void reportActivity() {
@ -314,36 +316,42 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
// private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
// ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
//
// JsonObject json = new JsonObject();
// json.addProperty("method", request.getMethod());
// json.add("params", jsonParser.parse(request.getParams()));
//
// TbMsgMetaData requestMetaData = defaultMetaData.copy();
// requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
// TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
// PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
// pushToRuleEngineWithTimeout(context, tbMsg, msgData);
//
// scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
// toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
// }
private void handleClientSideRPCRequest(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg request) {
UUID sessionId = getSessionId(sessionInfo);
JsonObject json = new JsonObject();
json.addProperty("method", request.getMethodName());
json.add("params", jsonParser.parse(request.getParams()));
TbMsgMetaData requestMetaData = defaultMetaData.copy();
requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(sessionId, getSessionType(sessionId), sessionInfo.getNodeId()));
}
public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
private TransportProtos.SessionType getSessionType(UUID sessionId) {
return sessions.containsKey(sessionId) ? TransportProtos.SessionType.ASYNC : TransportProtos.SessionType.SYNC;
}
void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
.setRequestId(msg.getId()).setError("timeout").build()
, data.getSessionId(), data.getNodeId());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
int requestId = msg.getMsg().getRequestId();
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
if (data != null) {
// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
, data.getSessionId(), data.getNodeId());
}
}
@ -382,7 +390,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (hasNotificationData) {
AttributeUpdateNotificationMsg finalNotification = notification.build();
attributeSubscriptions.entrySet().forEach(sub -> {
sendToTransport(finalNotification, sub.getKey(), sub.getValue());
sendToTransport(finalNotification, sub.getKey(), sub.getValue().getNodeId());
});
}
} else {
@ -390,6 +398,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
UUID sessionId = getSessionId(sessionInfo);
logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null));
} else {
logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
}
}
// private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {
// SessionId sessionId = msg.getSessionId();
// FromDeviceMsg inMsg = msg.getPayload();
@ -424,7 +445,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.remove(sessionId);
@ -438,8 +459,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private UUID getSessionId(SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
}
private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
@ -450,11 +475,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.put(sessionId, session);
sendPendingRequests(context, sessionId, sessionInfo);
}
}
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
UUID sessionId = getSessionId(sessionInfo);
if (msg.getEvent() == SessionEvent.OPEN) {
logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
@ -548,14 +574,31 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
}
private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, SessionInfo sessionInfo) {
private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setAttributeUpdateNotification(notificationMsg).build();
systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
systemContext.getRuleEngineTransportService().process(nodeId, msg);
}
private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToDeviceRequest(rpcMsg).build();
systemContext.getRuleEngineTransportService().process(nodeId, msg);
}
private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToServerResponse(rpcMsg).build();
systemContext.getRuleEngineTransportService().process(nodeId, msg);
}
private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
List<TsKvProto> clientAttributes;
if (result == null || result.isEmpty()) {

41
application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java

@ -1,41 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import java.util.Optional;
import java.util.UUID;
/**
* Created by ashvayka on 17.04.18.
*/
@Data
@AllArgsConstructor
public final class PendingSessionMsgData {
private final UUID sessionId;
private final Optional<ServerAddress> serverAddress;
private final SessionMsgType sessionMsgType;
private final int requestId;
private final boolean replyOnQueueAck;
private int ackMsgCount;
}

12
application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java

@ -16,18 +16,16 @@
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Optional;
import java.util.UUID;
/**
* @author Andrew Shvayka
*/
@Data
public class ToServerRpcRequestMetadata {
private final SessionId sessionId;
private final SessionType type;
private final Optional<ServerAddress> server;
private final UUID sessionId;
private final TransportProtos.SessionType type;
private final String nodeId;
}

17
common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -15,8 +15,11 @@
*/
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
/**
* Created by ashvayka on 04.10.18.
@ -26,4 +29,10 @@ public interface SessionMsgListener {
void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification);
void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification);
void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest);
void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
}

6
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@ -49,6 +51,10 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void deregisterSession(SessionInfoProto sessionInfo);

36
common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSyntaxException;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@ -95,6 +96,15 @@ public class JsonConverter {
}
}
public static JsonElement toJson(TransportProtos.ToDeviceRpcRequestMsg msg, boolean includeRequestId) {
JsonObject result = new JsonObject();
if (includeRequestId) {
result.addProperty("id", msg.getRequestId());
}
result.addProperty("method", msg.getMethodName());
result.add("params", new JsonParser().parse(msg.getParams()));
return result;
}
private static void parseObject(PostTelemetryMsg.Builder builder, long systemTs, JsonElement jsonObject) {
JsonObject jo = jsonObject.getAsJsonObject();
@ -112,14 +122,14 @@ public class JsonConverter {
request.addTsKvList(builder.build());
}
public static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
private static void parseWithTs(PostTelemetryMsg.Builder request, JsonObject jo) {
TsKvListProto.Builder builder = TsKvListProto.newBuilder();
builder.setTs(jo.get("ts").getAsLong());
builder.addAllKv(parseProtoValues(jo.get("values").getAsJsonObject()));
request.addTsKvList(builder.build());
}
public static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
private static List<KeyValueProto> parseProtoValues(JsonObject valuesObject) {
List<KeyValueProto> result = new ArrayList<>();
for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
JsonElement element = valueEntry.getValue();
@ -172,9 +182,9 @@ public class JsonConverter {
return request;
}
public static ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
public static TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(JsonElement json, int requestId) throws JsonSyntaxException {
JsonObject object = json.getAsJsonObject();
return new ToServerRpcRequestMsg(requestId, object.get("method").getAsString(), GSON.toJson(object.get("params")));
return TransportProtos.ToServerRpcRequestMsg.newBuilder().setRequestId(requestId).setMethodName(object.get("method").getAsString()).setParams(GSON.toJson(object.get("params"))).build();
}
private static void parseObject(BasicTelemetryUploadRequest request, long systemTs, JsonElement jsonObject) {
@ -368,8 +378,14 @@ public class JsonConverter {
return result;
}
public static JsonElement toJson(ToServerRpcResponseMsg msg) {
return new JsonParser().parse(msg.getData());
public static JsonElement toJson(TransportProtos.ToServerRpcResponseMsg msg) {
if (StringUtils.isEmpty(msg.getError())) {
return new JsonParser().parse(msg.getPayload());
} else {
JsonObject errorMsg = new JsonObject();
errorMsg.addProperty("error", msg.getError());
return errorMsg;
}
}
public static JsonElement toErrorJson(String errorMsg) {

29
common/transport/src/main/proto/transport.proto

@ -137,6 +137,29 @@ message SubscribeToRPCMsg {
bool unsubscribe = 1;
}
message ToDeviceRpcRequestMsg {
int32 requestId = 1;
string methodName = 2;
string params = 3;
}
message ToDeviceRpcResponseMsg {
int32 requestId = 1;
string payload = 2;
}
message ToServerRpcRequestMsg {
int32 requestId = 1;
string methodName = 2;
string params = 3;
}
message ToServerRpcResponseMsg {
int32 requestId = 1;
string payload = 2;
string error = 3;
}
message TransportToDeviceActorMsg {
SessionInfoProto sessionInfo = 1;
SessionEventMsg sessionEvent = 2;
@ -144,7 +167,9 @@ message TransportToDeviceActorMsg {
PostAttributeMsg postAttributes = 4;
GetAttributeRequestMsg getAttributes = 5;
SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
SubscribeToRPCMsg subscribeToRPC= 7;
SubscribeToRPCMsg subscribeToRPC = 7;
ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
ToServerRpcRequestMsg toServerRPCCallRequest = 9;
}
message DeviceActorToTransportMsg {
@ -153,6 +178,8 @@ message DeviceActorToTransportMsg {
SessionCloseNotificationProto sessionCloseNotification = 3;
GetAttributeResponseMsg getAttributesResponse = 4;
AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
ToDeviceRpcRequestMsg toDeviceRequest = 6;
ToServerRpcResponseMsg toServerResponse = 7;
}
/**

7
transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java

@ -98,7 +98,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
String payload = validatePayload(ctx, inbound);
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
// return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), 0);
return null;
}
@Override
@ -225,8 +226,8 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
private Response convertToServerRpcResponse(SessionContext ctx, ToServerRpcResponseMsg msg) {
if (msg.isSuccess()) {
Response response = new Response(ResponseCode.CONTENT);
JsonElement result = JsonConverter.toJson(msg);
response.setPayload(result.toString());
// JsonElement result = JsonConverter.toJson(msg);
// response.setPayload(result.toString());
return response;
} else {
return convertError(Optional.of(new RuntimeException("Server RPC response is empty!")));

2
transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java

@ -114,7 +114,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
}
private void reply(ToServerRpcResponseMsg msg) {
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
}
private void reply(AttributesUpdateNotification msg) {

71
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -223,43 +223,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)){
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)){
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
}
// AdaptorToSessionActorMsg msg = null;
// try {
// if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
// } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
// } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
// msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
// if (msgId >= 0) {
// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
// }
// } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
// if (msgId >= 0) {
// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
// }
// } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
// if (msgId >= 0) {
// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
// }
// }
// } catch (AdaptorException e) {
// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
// }
// if (msg != null) {
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
// } else {
// log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
// ctx.close();
// }
}
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
@ -555,4 +530,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
}
}
@Override
public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session", sessionId);
processDisconnect(deviceSessionCtx.getChannel());
}
@Override
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
try {
adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e);
}
}
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
log.trace("[{}] Received RPC command to device", sessionId);
try {
adaptor.convertToPublish(deviceSessionCtx, rpcResponse).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
log.trace("[{}] Failed to convert device RPC commandto MQTT msg", sessionId, e);
}
}
}

76
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt.adaptors;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import io.netty.buffer.ByteBuf;
@ -98,6 +97,31 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
@Override
public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
String payload = inbound.payload().toString(UTF8);
return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
} catch (RuntimeException e) {
log.warn("Failed to decode get attributes request", e);
throw new AdaptorException(e);
}
}
@Override
public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
@Override
public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
if (!StringUtils.isEmpty(responseMsg.getError())) {
@ -115,9 +139,17 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
@Override
public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
return Optional.of(createMqttPublishMsg(ctx,
MqttTopics.DEVICE_ATTRIBUTES_TOPIC,
JsonConverter.toJson(notificationMsg)));
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg)));
}
@Override
public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
}
@Override
public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
}
@Override
@ -149,7 +181,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound);
break;
case TO_SERVER_RPC_REQUEST:
msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);
msg = null;//convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);
break;
default:
log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type);
@ -181,13 +213,12 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
break;
case TO_DEVICE_RPC_REQUEST:
ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
rpcRequest);
result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), rpcRequest);
break;
case TO_SERVER_RPC_RESPONSE:
ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
rpcResponse);
// ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
// result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
// rpcResponse);
break;
case RULE_ENGINE_ERROR:
RuleEngineErrorMsg errorMsg = (RuleEngineErrorMsg) msg;
@ -232,7 +263,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
}
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
}
@ -290,7 +321,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId());
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
@ -299,18 +330,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId());
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId());
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}

26
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java

@ -19,7 +19,15 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.thingsboard.server.common.transport.TransportAdaptor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import java.util.Optional;
@ -29,15 +37,21 @@ import java.util.Optional;
*/
public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException;
ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse);
}

35
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -164,6 +164,15 @@ public class MqttTransportService implements TransportService {
if (toSessionMsg.hasAttributeUpdateNotification()) {
listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
}
if (toSessionMsg.hasSessionCloseNotification()) {
listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
}
if (toSessionMsg.hasToDeviceRequest()) {
listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
}
if (toSessionMsg.hasToServerResponse()) {
listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
}
});
} else {
//TODO: should we notify the device actor about missed session?
@ -272,6 +281,24 @@ public class MqttTransportService implements TransportService {
send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
public void process(SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setToDeviceRPCCallResponse(msg).build()
).build();
send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
public void process(SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setToServerRPCCallRequest(msg).build()
).build();
send(sessionInfo, toRuleEngineMsg, callback);
}
@Override
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);

Loading…
Cancel
Save