From 3c7295ee8e2ca2a085ff3a7338ad4f8c84437436 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 19 Apr 2018 17:07:11 +0300 Subject: [PATCH] RPC --- .../server/actors/ActorSystemContext.java | 4 + .../server/actors/device/DeviceActor.java | 19 +++- .../device/DeviceActorMessageProcessor.java | 93 +++++++++++----- .../actors/device/PendingSessionMsgData.java | 1 + .../device/ToDeviceRpcRequestMetadata.java | 4 +- .../device/ToServerRpcRequestMetadata.java | 33 ++++++ .../actors/rpc/BasicRpcSessionListener.java | 6 +- .../actors/ruleChain/DefaultTbContext.java | 46 +++++++- .../server/controller/RpcController.java | 88 ++++++++++++++- .../cluster/rpc/ClusterGrpcService.java | 6 +- .../cluster/rpc/ClusterRpcService.java | 4 +- .../service/rpc/DefaultDeviceRpcService.java | 104 ++++-------------- .../server/service/rpc/DeviceRpcService.java | 13 +-- ...g.java => ToDeviceRpcRequestActorMsg.java} | 4 +- .../rpc/ToServerRpcResponseActorMsg.java | 60 ++++++++++ .../src/main/resources/thingsboard.yml | 2 + .../server/common/msg/MsgType.java | 6 +- .../common/msg/core/RuleEngineError.java | 2 +- .../common/msg/core/RuleEngineErrorMsg.java | 2 + ...> DeviceActorClientSideRpcTimeoutMsg.java} | 6 +- .../DeviceActorServerSideRpcTimeoutMsg.java | 33 ++++++ .../api/RuleEngineDeviceRpcRequest.java | 36 ++++++ .../api/RuleEngineDeviceRpcResponse.java | 37 +++++++ .../rule/engine/api/RuleEngineRpcService.java | 30 +++++ .../rule/engine/api/TbContext.java | 6 + .../rule/engine/api/TbRelationTypes.java | 26 +++++ .../rule/engine/rpc/TbSendRPCReplyNode.java | 66 +++++++++++ .../rule/engine/rpc/TbSendRPCRequestNode.java | 101 +++++++++++++++++ .../rpc/TbSendRpcReplyNodeConfiguration.java | 33 ++++++ .../TbSendRpcRequestNodeConfiguration.java | 32 ++++++ 30 files changed, 754 insertions(+), 149 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java rename application/src/main/java/org/thingsboard/server/service/rpc/{ToDeviceRpcRequestMsg.java => ToDeviceRpcRequestActorMsg.java} (92%) create mode 100644 application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java rename common/message/src/main/java/org/thingsboard/server/common/msg/timeout/{DeviceActorRpcTimeoutMsg.java => DeviceActorClientSideRpcTimeoutMsg.java} (79%) create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 63029639a4..c1fd8782e5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -199,6 +199,10 @@ public class ActorSystemContext { @Getter private long queuePersistenceTimeout; + @Value("${actors.client_side_rpc.timeout}") + @Getter + private long clientSideRpcTimeout; + @Value("${actors.rule.chain.error_persist_frequency}") @Getter private long ruleChainErrorPersistFrequency; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index dfae339417..93df1808d5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -25,12 +25,13 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; +import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg; -import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg; +import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg; -import org.thingsboard.server.common.msg.timeout.TimeoutMsg; -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; public class DeviceActor extends ContextAwareActor { @@ -62,10 +63,16 @@ public class DeviceActor extends ContextAwareActor { processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg); break; case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: - processor.processRpcRequest(context(), (ToDeviceRpcRequestMsg) msg); + processor.processRpcRequest(context(), (ToDeviceRpcRequestActorMsg) msg); break; - case DEVICE_ACTOR_RPC_TIMEOUT_MSG: - processor.processRpcTimeout(context(), (DeviceActorRpcTimeoutMsg) msg); + case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: + processor.processToServerRPCResponse(context(), (ToServerRpcResponseActorMsg) msg); + break; + case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG: + processor.processServerSideRpcTimeout(context(), (DeviceActorServerSideRpcTimeoutMsg) msg); + break; + case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG: + processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg); break; case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG: processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 26c0a28068..5c1524133b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.DataConstants; @@ -53,28 +54,28 @@ import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg; import org.thingsboard.server.common.msg.core.SessionCloseMsg; import org.thingsboard.server.common.msg.core.SessionCloseNotification; import org.thingsboard.server.common.msg.core.SessionOpenMsg; -import org.thingsboard.server.common.msg.core.StatusCodeResponse; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg; import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.session.SessionType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; +import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg; -import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg; +import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg; -import org.thingsboard.server.extensions.api.plugins.PluginCallback; -import org.thingsboard.server.extensions.api.plugins.PluginContext; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; import org.thingsboard.server.extensions.api.plugins.msg.RpcError; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; import javax.annotation.Nullable; import java.util.ArrayList; @@ -87,7 +88,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Predicate; @@ -103,10 +103,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private final Map sessions; private final Map attributeSubscriptions; private final Map rpcSubscriptions; - private final Map rpcPendingMap; + private final Map toDeviceRpcPendingMap; + private final Map toServerRpcPendingMap; private final Map pendingMsgs; private final Gson gson = new Gson(); + private final JsonParser jsonParser = new JsonParser(); private int rpcSeq = 0; private String deviceName; @@ -120,7 +122,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.sessions = new HashMap<>(); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); - this.rpcPendingMap = new HashMap<>(); + this.toDeviceRpcPendingMap = new HashMap<>(); + this.toServerRpcPendingMap = new HashMap<>(); this.pendingMsgs = new HashMap<>(); initAttributes(); } @@ -134,7 +137,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.defaultMetaData.putValue("deviceType", deviceType); } - void processRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg) { + void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); ToDeviceRpcRequestBody body = request.getBody(); ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg( @@ -174,14 +177,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } - private void registerPendingRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { - rpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); - DeviceActorRpcTimeoutMsg timeoutMsg = new DeviceActorRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); + private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { + toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); + DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout()); } - void processRpcTimeout(ActorContext context, DeviceActorRpcTimeoutMsg msg) { - ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(msg.getId()); + void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) { + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); if (requestMd != null) { logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), @@ -200,7 +203,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) { PendingSessionMsgData data = pendingMsgs.remove(msg.getId()); - if (data != null) { + if (data != null && data.isReplyOnQueueAck()) { logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId()); ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId()); sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); @@ -208,8 +211,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional server) { - if (!rpcPendingMap.isEmpty()) { - logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId); + if (!toDeviceRpcPendingMap.isEmpty()) { + logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); if (type == SessionType.SYNC) { logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); @@ -219,12 +222,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } Set sentOneWayIds = new HashSet<>(); if (type == SessionType.ASYNC) { - rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds)); + toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds)); } else { - rpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds)); + toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds)); } - sentOneWayIds.forEach(rpcPendingMap::remove); + sentOneWayIds.forEach(toDeviceRpcPendingMap::remove); } private Consumer> processPendingRpc(ActorContext context, SessionId sessionId, Optional server, Set sentOneWayIds) { @@ -263,8 +266,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso handlePostTelemetryRequest(context, msg); break; case TO_SERVER_RPC_REQUEST: + handleClientSideRPCRequest(context, msg); break; - //TODO: push to queue and start processing! } } } @@ -345,11 +348,47 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso pushToRuleEngineWithTimeout(context, tbMsg, src, request); } + private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) { + ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload(); + + JsonObject json = new JsonObject(); + json.addProperty("method", request.getMethod()); + json.add("params", jsonParser.parse(request.getParams())); + + TbMsgMetaData requestMetaData = defaultMetaData.copy(); + requestMetaData.putValue("requestId", Integer.toString(request.getRequestId())); + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json)); + pushToRuleEngineWithTimeout(context, tbMsg, src, request); + + scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout()); + toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress())); + } + + public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) { + ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId()); + if (data != null) { + logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId()); + ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT); + sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer()); + } + } + + void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) { + ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId()); + if (data != null) { + sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer()); + } + } + private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) { + pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true); + } + + private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) { SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType(); int requestId = fromDeviceRequestMsg.getRequestId(); if (systemContext.isQueuePersistenceEnabled()) { - pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId)); + pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck)); scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout()); } else { ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId()); @@ -394,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) { logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId); ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg; - ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(responseMsg.getRequestId()); + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); boolean success = requestMd != null; if (success) { systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null)); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java index b3f381d94f..2ce05b2a4e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java @@ -32,5 +32,6 @@ public final class PendingSessionMsgData { private final Optional serverAddress; private final SessionMsgType sessionMsgType; private final int requestId; + private final boolean replyOnQueueAck; } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java index 4ee9a201f8..8a4262c5b2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java @@ -16,13 +16,13 @@ package org.thingsboard.server.actors.device; import lombok.Data; -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; /** * @author Andrew Shvayka */ @Data public class ToDeviceRpcRequestMetadata { - private final ToDeviceRpcRequestMsg msg; + private final ToDeviceRpcRequestActorMsg msg; private final boolean sent; } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java new file mode 100644 index 0000000000..f82a8c2481 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.device; + +import lombok.Data; +import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.session.SessionType; + +import java.util.Optional; + +/** + * @author Andrew Shvayka + */ +@Data +public class ToServerRpcRequestMetadata { + private final SessionId sessionId; + private final SessionType type; + private final Optional server; +} diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java index 65cbc229e7..bc36dc8098 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java @@ -37,7 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.cluster.rpc.GrpcSession; import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener; -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import java.io.Serializable; import java.util.UUID; @@ -142,14 +142,14 @@ public class BasicRpcSessionListener implements GrpcSessionListener { return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb()); } - private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) { + private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) { TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId())); DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId())); ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams()); ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody); - return new ToDeviceRpcRequestMsg(serverAddress, request); + return new ToDeviceRpcRequestActorMsg(serverAddress, request); } private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 8ffd378da7..e1331f99be 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,15 +16,20 @@ package org.thingsboard.server.actors.ruleChain; import akka.actor.ActorRef; -import akka.actor.Cancellable; +import com.datastax.driver.core.utils.UUIDs; import com.google.common.base.Function; import org.thingsboard.rule.engine.api.*; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; @@ -41,6 +46,7 @@ import scala.concurrent.duration.Duration; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Created by ashvayka on 19.03.18. @@ -112,6 +118,11 @@ class DefaultTbContext implements TbContext { nodeCtx.setSelf(self); } + @Override + public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { + return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data); + } + @Override public RuleNodeId getSelfId() { return nodeCtx.getSelf().getId(); @@ -206,4 +217,29 @@ class DefaultTbContext implements TbContext { public MailService getMailService() { return mainCtx.getMailService(); } + + @Override + public RuleEngineRpcService getRpcService() { + return new RuleEngineRpcService() { + @Override + public void sendRpcReply(DeviceId deviceId, int requestId, String body) { + mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body); + } + + @Override + public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer consumer) { + ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(), + src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody())); + mainCtx.getDeviceRpcService().process(request, response -> { + consumer.accept(RuleEngineDeviceRpcResponse.builder() + .deviceId(src.getDeviceId()) + .requestId(src.getRequestId()) + .error(response.getError()) + .response(response.getResponse()) + .build()); + }); + } + + }; + } } diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java index 2c7d2e6967..28261be14c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -30,18 +31,22 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import org.thingsboard.server.actors.plugin.ValidationResult; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.rpc.RpcRequest; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity; import org.thingsboard.server.extensions.api.plugins.PluginConstants; -import org.thingsboard.server.common.data.rpc.RpcRequest; -import org.thingsboard.server.service.rpc.LocalRequestMetaData; +import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; +import org.thingsboard.server.extensions.api.plugins.msg.RpcError; import org.thingsboard.server.service.rpc.DeviceRpcService; +import org.thingsboard.server.service.rpc.LocalRequestMetaData; import org.thingsboard.server.service.security.AccessValidator; import org.thingsboard.server.service.security.model.SecurityUser; @@ -53,6 +58,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; /** * Created by ashvayka on 22.03.18. @@ -117,7 +123,6 @@ public class RpcController extends BaseController { accessValidator.validate(currentUser, deviceId, new HttpValidationCallback(response, new FutureCallback>() { @Override public void onSuccess(@Nullable DeferredResult result) { - ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(), tenantId, deviceId, @@ -125,7 +130,13 @@ public class RpcController extends BaseController { timeout, body ); - deviceRpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, result)); + deviceRpcService.process(rpcRequest, new Consumer(){ + + @Override + public void accept(FromDeviceRpcResponse fromDeviceRpcResponse) { + reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse); + } + }); } @Override @@ -136,7 +147,7 @@ public class RpcController extends BaseController { } else { entity = new ResponseEntity(HttpStatus.UNAUTHORIZED); } - deviceRpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e); + logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e); response.setResult(entity); } })); @@ -146,4 +157,69 @@ public class RpcController extends BaseController { } } + public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) { + Optional rpcError = response.getError(); + DeferredResult responseWriter = rpcRequest.getResponseWriter(); + if (rpcError.isPresent()) { + logRpcCall(rpcRequest, rpcError, null); + RpcError error = rpcError.get(); + switch (error) { + case TIMEOUT: + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); + break; + case NO_ACTIVE_CONNECTION: + responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT)); + break; + default: + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); + break; + } + } else { + Optional responseData = response.getResponse(); + if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) { + String data = responseData.get(); + try { + logRpcCall(rpcRequest, rpcError, null); + responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK)); + } catch (IOException e) { + log.debug("Failed to decode device response: {}", data, e); + logRpcCall(rpcRequest, rpcError, e); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE)); + } + } else { + logRpcCall(rpcRequest, rpcError, null); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); + } + } + } + + private void logRpcCall(LocalRequestMetaData rpcRequest, Optional rpcError, Throwable e) { + logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null); + } + + + private void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional rpcError, Throwable e) { + String rpcErrorStr = ""; + if (rpcError.isPresent()) { + rpcErrorStr = "RPC Error: " + rpcError.get().name(); + } + String method = body.getMethod(); + String params = body.getParams(); + + auditLogService.logEntityAction( + user.getTenantId(), + user.getCustomerId(), + user.getId(), + user.getName(), + (UUIDBased & EntityId) entityId, + null, + ActionType.RPC_CALL, + BaseController.toException(e), + rpcErrorStr, + oneWay, + method, + params); + } + + } diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java index cdf8842f5a..27334c6601 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java @@ -40,7 +40,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc; import org.thingsboard.server.service.cluster.discovery.ServerInstance; import org.thingsboard.server.service.cluster.discovery.ServerInstanceService; -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import javax.annotation.PreDestroy; import java.io.IOException; @@ -132,7 +132,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI } @Override - public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) { + public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) { ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder() .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build(); tell(serverAddress, msg); @@ -196,7 +196,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI ).build(); } - private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) { + private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) { ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder(); ToDeviceRpcRequest request = msg.getMsg(); diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java index e5a04346c3..6aefe46a63 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java @@ -24,7 +24,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg; import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import java.util.UUID; @@ -41,7 +41,7 @@ public interface ClusterRpcService { void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward); - void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward); + void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward); void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java index 4a730dbe23..fcb9c269ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.rpc; -import akka.actor.ActorRef; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -28,12 +27,15 @@ import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.controller.BaseController; import org.thingsboard.server.dao.audit.AuditLogService; +import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; import org.thingsboard.server.extensions.api.plugins.msg.RpcError; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; @@ -51,6 +53,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Created by ashvayka on 27.03.18. @@ -59,8 +62,6 @@ import java.util.function.BiConsumer; @Slf4j public class DefaultDeviceRpcService implements DeviceRpcService { - private static final ObjectMapper jsonMapper = new ObjectMapper(); - @Autowired private ClusterRoutingService routingService; @@ -75,7 +76,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { private ScheduledExecutorService rpcCallBackExecutor; - private final ConcurrentMap localRpcRequests = new ConcurrentHashMap<>(); + private final ConcurrentMap> localRpcRequests = new ConcurrentHashMap<>(); @PostConstruct @@ -91,18 +92,18 @@ public class DefaultDeviceRpcService implements DeviceRpcService { } @Override - public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) { + public void process(ToDeviceRpcRequest request, Consumer responseConsumer) { log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId()); sendRpcRequest(request); UUID requestId = request.getId(); - localRpcRequests.put(requestId, metaData); + localRpcRequests.put(requestId, responseConsumer); long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); log.error("[{}] processing the request: [{}]", this.hashCode(), requestId); rpcCallBackExecutor.schedule(() -> { log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId); - LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId); - if (localMetaData != null) { - reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)); + Consumer consumer = localRpcRequests.remove(requestId); + if (consumer != null) { + consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)); } }, timeout, TimeUnit.MILLISECONDS); } @@ -123,58 +124,27 @@ public class DefaultDeviceRpcService implements DeviceRpcService { log.error("[{}] response the request: [{}]", this.hashCode(), response.getId()); //TODO: send to another server if needed. UUID requestId = response.getId(); - LocalRequestMetaData md = localRpcRequests.remove(requestId); - if (md != null) { - log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId()); - reply(md, response); + Consumer consumer = localRpcRequests.remove(requestId); + if (consumer != null) { + consumer.accept(response); } else { log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response); } } - public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) { - Optional rpcError = response.getError(); - DeferredResult responseWriter = rpcRequest.getResponseWriter(); - if (rpcError.isPresent()) { - logRpcCall(rpcRequest, rpcError, null); - RpcError error = rpcError.get(); - switch (error) { - case TIMEOUT: - responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); - break; - case NO_ACTIVE_CONNECTION: - responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT)); - break; - default: - responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT)); - break; - } - } else { - Optional responseData = response.getResponse(); - if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) { - String data = responseData.get(); - try { - logRpcCall(rpcRequest, rpcError, null); - responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK)); - } catch (IOException e) { - log.debug("Failed to decode device response: {}", data, e); - logRpcCall(rpcRequest, rpcError, e); - responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE)); - } - } else { - logRpcCall(rpcRequest, rpcError, null); - responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK)); - } - } + @Override + public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) { + ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body)); + forward(deviceId, rpcMsg, rpcService::tell); } private void sendRpcRequest(ToDeviceRpcRequest msg) { log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg); - ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg); + ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg); forward(msg.getDeviceId(), rpcMsg, rpcService::tell); } - private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer rpcFunction) { + private void forward(DeviceId deviceId, T msg, BiConsumer rpcFunction) { Optional instance = routingService.resolveById(deviceId); if (instance.isPresent()) { log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg); @@ -184,32 +154,4 @@ public class DefaultDeviceRpcService implements DeviceRpcService { actorService.onMsg(msg); } } - - private void logRpcCall(LocalRequestMetaData rpcRequest, Optional rpcError, Throwable e) { - logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null); - } - - @Override - public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional rpcError, Throwable e) { - String rpcErrorStr = ""; - if (rpcError.isPresent()) { - rpcErrorStr = "RPC Error: " + rpcError.get().name(); - } - String method = body.getMethod(); - String params = body.getParams(); - - auditLogService.logEntityAction( - user.getTenantId(), - user.getCustomerId(), - user.getId(), - user.getName(), - (UUIDBased & EntityId) entityId, - null, - ActionType.RPC_CALL, - BaseController.toException(e), - rpcErrorStr, - oneWay, - method, - params); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java index f58db7f7fa..429912000c 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java @@ -15,26 +15,25 @@ */ package org.thingsboard.server.service.rpc; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; -import org.thingsboard.server.extensions.api.plugins.msg.RpcError; -import org.thingsboard.server.service.security.model.SecurityUser; -import java.util.Optional; +import java.util.function.Consumer; /** * Created by ashvayka on 16.04.18. */ public interface DeviceRpcService { - void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData); + void process(ToDeviceRpcRequest request, Consumer responseConsumer); void process(ToDeviceRpcRequest request, ServerAddress originator); void process(FromDeviceRpcResponse response); - void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional rpcError, Throwable e); + void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body); + } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java similarity index 92% rename from application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java rename to application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java index ea77333445..ae7cab5b2b 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java @@ -32,13 +32,13 @@ import java.util.Optional; */ @ToString @RequiredArgsConstructor -public class ToDeviceRpcRequestMsg implements ToDeviceActorNotificationMsg { +public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg { private final ServerAddress serverAddress; @Getter private final ToDeviceRpcRequest msg; - public ToDeviceRpcRequestMsg(ToDeviceRpcRequest msg) { + public ToDeviceRpcRequestActorMsg(ToDeviceRpcRequest msg) { this(null, msg); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java new file mode 100644 index 0000000000..f3183ec846 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg; +import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; + +import java.util.Optional; + +/** + * Created by ashvayka on 16.04.18. + */ +@ToString +@RequiredArgsConstructor +public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg { + + private final ServerAddress serverAddress; + + @Getter + private final TenantId tenantId; + + @Getter + private final DeviceId deviceId; + + @Getter + private final ToServerRpcResponseMsg msg; + + public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) { + this(null, tenantId, deviceId, msg); + } + + public Optional getServerAddress() { + return Optional.ofNullable(serverAddress); + } + + @Override + public MsgType getMsgType() { + return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG; + } +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 043b8dae3e..d9e9012670 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -229,6 +229,8 @@ actors: enabled: "${ACTORS_QUEUE_ENABLED:true}" # Maximum allowed timeout for persistence into the queue timeout: "${ACTORS_QUEUE_PERSISTENCE_TIMEOUT:30000}" + client_side_rpc: + timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}" cache: # caffeine or redis diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 6bde7fa332..74c08da043 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -75,7 +75,11 @@ public enum MsgType { DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG, - DEVICE_ACTOR_RPC_TIMEOUT_MSG, + SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG, + + DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG, + + DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG, DEVICE_ACTOR_QUEUE_TIMEOUT_MSG, diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java index f5e249c190..dcfde0fb52 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java @@ -21,7 +21,7 @@ package org.thingsboard.server.common.msg.core; public enum RuleEngineError { - QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true); + QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true), TIMEOUT; private final boolean critical; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java index 61e5cf58eb..e0ff23b379 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java @@ -44,6 +44,8 @@ public class RuleEngineErrorMsg implements ToDeviceMsg { return "Timeout during persistence of the message to the queue!"; case SERVER_ERROR: return "Error during processing of message by the server!"; + case TIMEOUT: + return "Timeout during processing of message by the server!"; default: throw new RuntimeException("Error " + error + " is not supported!"); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorRpcTimeoutMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorClientSideRpcTimeoutMsg.java similarity index 79% rename from common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorRpcTimeoutMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorClientSideRpcTimeoutMsg.java index 725db385c4..f5cdfd1c26 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorRpcTimeoutMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorClientSideRpcTimeoutMsg.java @@ -20,14 +20,14 @@ import org.thingsboard.server.common.msg.MsgType; /** * @author Andrew Shvayka */ -public final class DeviceActorRpcTimeoutMsg extends TimeoutMsg { +public final class DeviceActorClientSideRpcTimeoutMsg extends TimeoutMsg { - public DeviceActorRpcTimeoutMsg(Integer id, long timeout) { + public DeviceActorClientSideRpcTimeoutMsg(Integer id, long timeout) { super(id, timeout); } @Override public MsgType getMsgType() { - return MsgType.DEVICE_ACTOR_RPC_TIMEOUT_MSG; + return MsgType.DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG; } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java new file mode 100644 index 0000000000..cd86892b7f --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.timeout; + +import org.thingsboard.server.common.msg.MsgType; + +/** + * @author Andrew Shvayka + */ +public final class DeviceActorServerSideRpcTimeoutMsg extends TimeoutMsg { + + public DeviceActorServerSideRpcTimeoutMsg(Integer id, long timeout) { + super(id, timeout); + } + + @Override + public MsgType getMsgType() { + return MsgType.DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG; + } +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java new file mode 100644 index 0000000000..b3d724f023 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.DeviceId; + +/** + * Created by ashvayka on 02.04.18. + */ +@Data +@Builder +public final class RuleEngineDeviceRpcRequest { + + private final DeviceId deviceId; + private final int requestId; + private final boolean oneway; + private final String method; + private final String body; + private final long timeout; + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java new file mode 100644 index 0000000000..bd0f3bbf55 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.extensions.api.plugins.msg.RpcError; + +import java.util.Optional; + +/** + * Created by ashvayka on 02.04.18. + */ +@Data +@Builder +public final class RuleEngineDeviceRpcResponse { + + private final DeviceId deviceId; + private final int requestId; + private final Optional response; + private final Optional error; + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java new file mode 100644 index 0000000000..df9d8d9392 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import org.thingsboard.server.common.data.id.DeviceId; +import java.util.function.Consumer; + +/** + * Created by ashvayka on 02.04.18. + */ +public interface RuleEngineRpcService { + + void sendRpcReply(DeviceId deviceId, int requestId, String body); + + void sendRpcRequest(RuleEngineDeviceRpcRequest request, Consumer consumer); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 6038e6d84f..bd0a0628b4 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -15,10 +15,12 @@ */ package org.thingsboard.rule.engine.api; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; @@ -58,6 +60,8 @@ public interface TbContext { void updateSelf(RuleNode self); + TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data); + RuleNodeId getSelfId(); TenantId getTenantId(); @@ -78,6 +82,8 @@ public interface TbContext { RuleChainService getRuleChainService(); + RuleEngineRpcService getRpcService(); + RuleEngineTelemetryService getTelemetryService(); TimeseriesService getTimeseriesService(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java new file mode 100644 index 0000000000..e6455da04f --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +/** + * Created by ashvayka on 19.01.18. + */ +public final class TbRelationTypes { + + public static String SUCCESS = "Success"; + public static String FAILURE = "Failure"; + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java new file mode 100644 index 0000000000..9649768a9c --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rpc; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "rpc call reply", + configClazz = TbSendRpcReplyNodeConfiguration.class, + nodeDescription = "Sends reply to the RPC call from device", + nodeDetails = "Expects messages with any message type. Will forward message body to the device." +) +public class TbSendRPCReplyNode implements TbNode { + + private TbSendRpcReplyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbSendRpcReplyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute()); + if (msg.getOriginator().getEntityType() != EntityType.DEVICE) { + ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!")); + } else if (StringUtils.isEmpty(requestIdStr)) { + ctx.tellError(msg, new RuntimeException("Request id is not present in the metadata!")); + } else if (StringUtils.isEmpty(msg.getData())) { + ctx.tellError(msg, new RuntimeException("Request body is empty!")); + } else { + ctx.getRpcService().sendRpcReply(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData()); + } + } + + @Override + public void destroy() { + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java new file mode 100644 index 0000000000..937ced66ac --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -0,0 +1,101 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rpc; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "rpc call request", + configClazz = TbSendRpcReplyNodeConfiguration.class, + nodeDescription = "Sends one-way RPC call to device", + nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes." +) +public class TbSendRPCRequestNode implements TbNode { + + private Random random = new Random(); + private Gson gson = new Gson(); + private JsonParser jsonParser = new JsonParser(); + private TbSendRpcRequestNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbSendRpcRequestNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject(); + + if (msg.getOriginator().getEntityType() != EntityType.DEVICE) { + ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!")); + } else if (!json.has("method")) { + ctx.tellError(msg, new RuntimeException("Method is not present in the message!")); + } else if (!json.has("params")) { + ctx.tellError(msg, new RuntimeException("Params are not present in the message!")); + } else { + int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt(); + RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder() + .method(gson.toJson(json.get("method"))) + .body(gson.toJson(json.get("params"))) + .deviceId(new DeviceId(msg.getOriginator().getId())) + .requestId(requestId) + .timeout(TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())) + .build(); + + ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> { + if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { + TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get()); + ctx.tellNext(next, TbRelationTypes.SUCCESS); + } else { + TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); + ctx.tellNext(next, TbRelationTypes.FAILURE); + ctx.tellError(msg, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); + } + }); + } + } + + @Override + public void destroy() { + } + + private String wrap(String name, String body) { + JsonObject json = new JsonObject(); + json.addProperty(name, body); + return gson.toJson(json); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java new file mode 100644 index 0000000000..402a33bc15 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rpc; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.DataConstants; + +@Data +public class TbSendRpcReplyNodeConfiguration implements NodeConfiguration { + + private String requestIdMetaDataAttribute; + + @Override + public TbSendRpcReplyNodeConfiguration defaultConfiguration() { + TbSendRpcReplyNodeConfiguration configuration = new TbSendRpcReplyNodeConfiguration(); + configuration.setRequestIdMetaDataAttribute("requestId"); + return configuration; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java new file mode 100644 index 0000000000..214ce659a9 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rpc; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; + +@Data +public class TbSendRpcRequestNodeConfiguration implements NodeConfiguration { + + private int timeoutInSeconds; + + @Override + public TbSendRpcRequestNodeConfiguration defaultConfiguration() { + TbSendRpcRequestNodeConfiguration configuration = new TbSendRpcRequestNodeConfiguration(); + configuration.setTimeoutInSeconds(60); + return configuration; + } +}