Browse Source

RPC

pull/748/head
Andrew Shvayka 8 years ago
parent
commit
3c7295ee8e
  1. 4
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 19
      application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
  3. 93
      application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
  4. 1
      application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
  5. 4
      application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java
  6. 33
      application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java
  7. 6
      application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
  8. 46
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  9. 88
      application/src/main/java/org/thingsboard/server/controller/RpcController.java
  10. 6
      application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
  11. 4
      application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
  12. 104
      application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
  13. 13
      application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
  14. 4
      application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
  15. 60
      application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
  16. 2
      application/src/main/resources/thingsboard.yml
  17. 6
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  18. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java
  19. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java
  20. 6
      common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorClientSideRpcTimeoutMsg.java
  21. 33
      common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java
  22. 36
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
  23. 37
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java
  24. 30
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java
  25. 6
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  26. 26
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java
  27. 66
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java
  28. 101
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
  29. 33
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java
  30. 32
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java

4
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -199,6 +199,10 @@ public class ActorSystemContext {
@Getter
private long queuePersistenceTimeout;
@Value("${actors.client_side_rpc.timeout}")
@Getter
private long clientSideRpcTimeout;
@Value("${actors.rule.chain.error_persist_frequency}")
@Getter
private long ruleChainErrorPersistFrequency;

19
application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java

@ -25,12 +25,13 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
public class DeviceActor extends ContextAwareActor {
@ -62,10 +63,16 @@ public class DeviceActor extends ContextAwareActor {
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
break;
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
processor.processRpcRequest(context(), (ToDeviceRpcRequestMsg) msg);
processor.processRpcRequest(context(), (ToDeviceRpcRequestActorMsg) msg);
break;
case DEVICE_ACTOR_RPC_TIMEOUT_MSG:
processor.processRpcTimeout(context(), (DeviceActorRpcTimeoutMsg) msg);
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
processor.processToServerRPCResponse(context(), (ToServerRpcResponseActorMsg) msg);
break;
case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG:
processor.processServerSideRpcTimeout(context(), (DeviceActorServerSideRpcTimeoutMsg) msg);
break;
case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
break;
case DEVICE_ACTOR_QUEUE_TIMEOUT_MSG:
processor.processQueueTimeout(context(), (DeviceActorQueueTimeoutMsg) msg);

93
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants;
@ -53,28 +54,28 @@ import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.core.SessionCloseNotification;
import org.thingsboard.server.common.msg.core.SessionOpenMsg;
import org.thingsboard.server.common.msg.core.StatusCodeResponse;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorRpcTimeoutMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -87,7 +88,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -103,10 +103,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private final Map<SessionId, SessionInfo> sessions;
private final Map<SessionId, SessionInfo> attributeSubscriptions;
private final Map<SessionId, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> rpcPendingMap;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
private final Map<UUID, PendingSessionMsgData> pendingMsgs;
private final Gson gson = new Gson();
private final JsonParser jsonParser = new JsonParser();
private int rpcSeq = 0;
private String deviceName;
@ -120,7 +122,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.sessions = new HashMap<>();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.rpcPendingMap = new HashMap<>();
this.toDeviceRpcPendingMap = new HashMap<>();
this.toServerRpcPendingMap = new HashMap<>();
this.pendingMsgs = new HashMap<>();
initAttributes();
}
@ -134,7 +137,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.defaultMetaData.putValue("deviceType", deviceType);
}
void processRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg) {
void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
@ -174,14 +177,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void registerPendingRpcRequest(ActorContext context, org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
rpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
DeviceActorRpcTimeoutMsg timeoutMsg = new DeviceActorRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
}
void processRpcTimeout(ActorContext context, DeviceActorRpcTimeoutMsg msg) {
ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(msg.getId());
void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) {
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
@ -200,7 +203,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
if (data != null) {
if (data != null && data.isReplyOnQueueAck()) {
logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
@ -208,8 +211,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
if (!rpcPendingMap.isEmpty()) {
logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId);
if (!toDeviceRpcPendingMap.isEmpty()) {
logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
if (type == SessionType.SYNC) {
logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
@ -219,12 +222,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
Set<Integer> sentOneWayIds = new HashSet<>();
if (type == SessionType.ASYNC) {
rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
} else {
rpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
}
sentOneWayIds.forEach(rpcPendingMap::remove);
sentOneWayIds.forEach(toDeviceRpcPendingMap::remove);
}
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
@ -263,8 +266,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
handlePostTelemetryRequest(context, msg);
break;
case TO_SERVER_RPC_REQUEST:
handleClientSideRPCRequest(context, msg);
break;
//TODO: push to queue and start processing!
}
}
}
@ -345,11 +348,47 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
pushToRuleEngineWithTimeout(context, tbMsg, src, request);
}
private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
ToServerRpcRequestMsg request = (ToServerRpcRequestMsg) src.getPayload();
JsonObject json = new JsonObject();
json.addProperty("method", request.getMethod());
json.add("params", jsonParser.parse(request.getParams()));
TbMsgMetaData requestMetaData = defaultMetaData.copy();
requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json));
pushToRuleEngineWithTimeout(context, tbMsg, src, request);
scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
}
public void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
if (data != null) {
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
}
}
private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {
pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true);
}
private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) {
SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType();
int requestId = fromDeviceRequestMsg.getRequestId();
if (systemContext.isQueuePersistenceEnabled()) {
pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId));
pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck));
scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
} else {
ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId());
@ -394,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {
logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;
ToDeviceRpcRequestMetadata requestMd = rpcPendingMap.remove(responseMsg.getRequestId());
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null));

1
application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java

@ -32,5 +32,6 @@ public final class PendingSessionMsgData {
private final Optional<ServerAddress> serverAddress;
private final SessionMsgType sessionMsgType;
private final int requestId;
private final boolean replyOnQueueAck;
}

4
application/src/main/java/org/thingsboard/server/actors/device/ToDeviceRpcRequestMetadata.java

@ -16,13 +16,13 @@
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
/**
* @author Andrew Shvayka
*/
@Data
public class ToDeviceRpcRequestMetadata {
private final ToDeviceRpcRequestMsg msg;
private final ToDeviceRpcRequestActorMsg msg;
private final boolean sent;
}

33
application/src/main/java/org/thingsboard/server/actors/device/ToServerRpcRequestMetadata.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.SessionType;
import java.util.Optional;
/**
* @author Andrew Shvayka
*/
@Data
public class ToServerRpcRequestMetadata {
private final SessionId sessionId;
private final SessionType type;
private final Optional<ServerAddress> server;
}

6
application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java

@ -37,7 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.rpc.GrpcSession;
import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.io.Serializable;
import java.util.UUID;
@ -142,14 +142,14 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
}
private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
return new ToDeviceRpcRequestMsg(serverAddress, request);
return new ToDeviceRpcRequestActorMsg(serverAddress, request);
}
private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {

46
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -16,15 +16,20 @@
package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
@ -41,6 +46,7 @@ import scala.concurrent.duration.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Created by ashvayka on 19.03.18.
@ -112,6 +118,11 @@ class DefaultTbContext implements TbContext {
nodeCtx.setSelf(self);
}
@Override
public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data);
}
@Override
public RuleNodeId getSelfId() {
return nodeCtx.getSelf().getId();
@ -206,4 +217,29 @@ class DefaultTbContext implements TbContext {
public MailService getMailService() {
return mainCtx.getMailService();
}
@Override
public RuleEngineRpcService getRpcService() {
return new RuleEngineRpcService() {
@Override
public void sendRpcReply(DeviceId deviceId, int requestId, String body) {
mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body);
}
@Override
public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
mainCtx.getDeviceRpcService().process(request, response -> {
consumer.accept(RuleEngineDeviceRpcResponse.builder()
.deviceId(src.getDeviceId())
.requestId(src.getRequestId())
.error(response.getError())
.response(response.getResponse())
.build());
});
}
};
}
}

88
application/src/main/java/org/thingsboard/server/controller/RpcController.java

@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@ -30,18 +31,22 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.actors.plugin.ValidationResult;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.rpc.RpcRequest;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
import org.thingsboard.server.extensions.api.plugins.PluginConstants;
import org.thingsboard.server.common.data.rpc.RpcRequest;
import org.thingsboard.server.service.rpc.LocalRequestMetaData;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.service.rpc.DeviceRpcService;
import org.thingsboard.server.service.rpc.LocalRequestMetaData;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
@ -53,6 +58,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
/**
* Created by ashvayka on 22.03.18.
@ -117,7 +123,6 @@ public class RpcController extends BaseController {
accessValidator.validate(currentUser, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
tenantId,
deviceId,
@ -125,7 +130,13 @@ public class RpcController extends BaseController {
timeout,
body
);
deviceRpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, result));
deviceRpcService.process(rpcRequest, new Consumer<FromDeviceRpcResponse>(){
@Override
public void accept(FromDeviceRpcResponse fromDeviceRpcResponse) {
reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse);
}
});
}
@Override
@ -136,7 +147,7 @@ public class RpcController extends BaseController {
} else {
entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
}
deviceRpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
response.setResult(entity);
}
}));
@ -146,4 +157,69 @@ public class RpcController extends BaseController {
}
}
public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
Optional<RpcError> rpcError = response.getError();
DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
if (rpcError.isPresent()) {
logRpcCall(rpcRequest, rpcError, null);
RpcError error = rpcError.get();
switch (error) {
case TIMEOUT:
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
break;
case NO_ACTIVE_CONNECTION:
responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
break;
default:
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
break;
}
} else {
Optional<String> responseData = response.getResponse();
if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
String data = responseData.get();
try {
logRpcCall(rpcRequest, rpcError, null);
responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
} catch (IOException e) {
log.debug("Failed to decode device response: {}", data, e);
logRpcCall(rpcRequest, rpcError, e);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
}
} else {
logRpcCall(rpcRequest, rpcError, null);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
}
}
}
private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
}
private void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
String rpcErrorStr = "";
if (rpcError.isPresent()) {
rpcErrorStr = "RPC Error: " + rpcError.get().name();
}
String method = body.getMethod();
String params = body.getParams();
auditLogService.logEntityAction(
user.getTenantId(),
user.getCustomerId(),
user.getId(),
user.getName(),
(UUIDBased & EntityId) entityId,
null,
ActionType.RPC_CALL,
BaseController.toException(e),
rpcErrorStr,
oneWay,
method,
params);
}
}

6
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java

@ -40,7 +40,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import javax.annotation.PreDestroy;
import java.io.IOException;
@ -132,7 +132,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
}
@Override
public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) {
public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
.setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
tell(serverAddress, msg);
@ -196,7 +196,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
).build();
}
private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) {
private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
ToDeviceRpcRequest request = msg.getMsg();

4
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java

@ -24,7 +24,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.util.UUID;
@ -41,7 +41,7 @@ public interface ClusterRpcService {
void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward);
void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);

104
application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.rpc;
import akka.actor.ActorRef;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -28,12 +27,15 @@ import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.controller.BaseController;
import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
@ -51,6 +53,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* Created by ashvayka on 27.03.18.
@ -59,8 +62,6 @@ import java.util.function.BiConsumer;
@Slf4j
public class DefaultDeviceRpcService implements DeviceRpcService {
private static final ObjectMapper jsonMapper = new ObjectMapper();
@Autowired
private ClusterRoutingService routingService;
@ -75,7 +76,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
private ScheduledExecutorService rpcCallBackExecutor;
private final ConcurrentMap<UUID, LocalRequestMetaData> localRpcRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
@PostConstruct
@ -91,18 +92,18 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
}
@Override
public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) {
public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
sendRpcRequest(request);
UUID requestId = request.getId();
localRpcRequests.put(requestId, metaData);
localRpcRequests.put(requestId, responseConsumer);
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
log.error("[{}] processing the request: [{}]", this.hashCode(), requestId);
rpcCallBackExecutor.schedule(() -> {
log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
if (localMetaData != null) {
reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
if (consumer != null) {
consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
}
}, timeout, TimeUnit.MILLISECONDS);
}
@ -123,58 +124,27 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
//TODO: send to another server if needed.
UUID requestId = response.getId();
LocalRequestMetaData md = localRpcRequests.remove(requestId);
if (md != null) {
log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
reply(md, response);
Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
if (consumer != null) {
consumer.accept(response);
} else {
log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
}
}
public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
Optional<RpcError> rpcError = response.getError();
DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
if (rpcError.isPresent()) {
logRpcCall(rpcRequest, rpcError, null);
RpcError error = rpcError.get();
switch (error) {
case TIMEOUT:
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
break;
case NO_ACTIVE_CONNECTION:
responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
break;
default:
responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
break;
}
} else {
Optional<String> responseData = response.getResponse();
if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
String data = responseData.get();
try {
logRpcCall(rpcRequest, rpcError, null);
responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
} catch (IOException e) {
log.debug("Failed to decode device response: {}", data, e);
logRpcCall(rpcRequest, rpcError, e);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
}
} else {
logRpcCall(rpcRequest, rpcError, null);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
}
}
@Override
public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
forward(deviceId, rpcMsg, rpcService::tell);
}
private void sendRpcRequest(ToDeviceRpcRequest msg) {
log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg);
ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
}
private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer<ServerAddress, ToDeviceRpcRequestMsg> rpcFunction) {
private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
Optional<ServerAddress> instance = routingService.resolveById(deviceId);
if (instance.isPresent()) {
log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
@ -184,32 +154,4 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
actorService.onMsg(msg);
}
}
private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
}
@Override
public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
String rpcErrorStr = "";
if (rpcError.isPresent()) {
rpcErrorStr = "RPC Error: " + rpcError.get().name();
}
String method = body.getMethod();
String params = body.getParams();
auditLogService.logEntityAction(
user.getTenantId(),
user.getCustomerId(),
user.getId(),
user.getName(),
(UUIDBased & EntityId) entityId,
null,
ActionType.RPC_CALL,
BaseController.toException(e),
rpcErrorStr,
oneWay,
method,
params);
}
}

13
application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java

@ -15,26 +15,25 @@
*/
package org.thingsboard.server.service.rpc;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.Optional;
import java.util.function.Consumer;
/**
* Created by ashvayka on 16.04.18.
*/
public interface DeviceRpcService {
void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData);
void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
void process(ToDeviceRpcRequest request, ServerAddress originator);
void process(FromDeviceRpcResponse response);
void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e);
void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
}

4
application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestMsg.java → application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java

@ -32,13 +32,13 @@ import java.util.Optional;
*/
@ToString
@RequiredArgsConstructor
public class ToDeviceRpcRequestMsg implements ToDeviceActorNotificationMsg {
public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg {
private final ServerAddress serverAddress;
@Getter
private final ToDeviceRpcRequest msg;
public ToDeviceRpcRequestMsg(ToDeviceRpcRequest msg) {
public ToDeviceRpcRequestActorMsg(ToDeviceRpcRequest msg) {
this(null, msg);
}

60
application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.rpc;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import java.util.Optional;
/**
* Created by ashvayka on 16.04.18.
*/
@ToString
@RequiredArgsConstructor
public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
private final ServerAddress serverAddress;
@Getter
private final TenantId tenantId;
@Getter
private final DeviceId deviceId;
@Getter
private final ToServerRpcResponseMsg msg;
public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) {
this(null, tenantId, deviceId, msg);
}
public Optional<ServerAddress> getServerAddress() {
return Optional.ofNullable(serverAddress);
}
@Override
public MsgType getMsgType() {
return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
}
}

2
application/src/main/resources/thingsboard.yml

@ -229,6 +229,8 @@ actors:
enabled: "${ACTORS_QUEUE_ENABLED:true}"
# Maximum allowed timeout for persistence into the queue
timeout: "${ACTORS_QUEUE_PERSISTENCE_TIMEOUT:30000}"
client_side_rpc:
timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
cache:
# caffeine or redis

6
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -75,7 +75,11 @@ public enum MsgType {
DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG,
DEVICE_ACTOR_RPC_TIMEOUT_MSG,
SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG,
DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG,
DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
DEVICE_ACTOR_QUEUE_TIMEOUT_MSG,

2
common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineError.java

@ -21,7 +21,7 @@ package org.thingsboard.server.common.msg.core;
public enum RuleEngineError {
QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true);
QUEUE_PUT_TIMEOUT(true), SERVER_ERROR(true), TIMEOUT;
private final boolean critical;

2
common/message/src/main/java/org/thingsboard/server/common/msg/core/RuleEngineErrorMsg.java

@ -44,6 +44,8 @@ public class RuleEngineErrorMsg implements ToDeviceMsg {
return "Timeout during persistence of the message to the queue!";
case SERVER_ERROR:
return "Error during processing of message by the server!";
case TIMEOUT:
return "Timeout during processing of message by the server!";
default:
throw new RuntimeException("Error " + error + " is not supported!");
}

6
common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorRpcTimeoutMsg.java → common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorClientSideRpcTimeoutMsg.java

@ -20,14 +20,14 @@ import org.thingsboard.server.common.msg.MsgType;
/**
* @author Andrew Shvayka
*/
public final class DeviceActorRpcTimeoutMsg extends TimeoutMsg<Integer> {
public final class DeviceActorClientSideRpcTimeoutMsg extends TimeoutMsg<Integer> {
public DeviceActorRpcTimeoutMsg(Integer id, long timeout) {
public DeviceActorClientSideRpcTimeoutMsg(Integer id, long timeout) {
super(id, timeout);
}
@Override
public MsgType getMsgType() {
return MsgType.DEVICE_ACTOR_RPC_TIMEOUT_MSG;
return MsgType.DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG;
}
}

33
common/message/src/main/java/org/thingsboard/server/common/msg/timeout/DeviceActorServerSideRpcTimeoutMsg.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.timeout;
import org.thingsboard.server.common.msg.MsgType;
/**
* @author Andrew Shvayka
*/
public final class DeviceActorServerSideRpcTimeoutMsg extends TimeoutMsg<Integer> {
public DeviceActorServerSideRpcTimeoutMsg(Integer id, long timeout) {
super(id, timeout);
}
@Override
public MsgType getMsgType() {
return MsgType.DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG;
}
}

36
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.id.DeviceId;
/**
* Created by ashvayka on 02.04.18.
*/
@Data
@Builder
public final class RuleEngineDeviceRpcRequest {
private final DeviceId deviceId;
private final int requestId;
private final boolean oneway;
private final String method;
private final String body;
private final long timeout;
}

37
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcResponse.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import java.util.Optional;
/**
* Created by ashvayka on 02.04.18.
*/
@Data
@Builder
public final class RuleEngineDeviceRpcResponse {
private final DeviceId deviceId;
private final int requestId;
private final Optional<String> response;
private final Optional<RpcError> error;
}

30
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.DeviceId;
import java.util.function.Consumer;
/**
* Created by ashvayka on 02.04.18.
*/
public interface RuleEngineRpcService {
void sendRpcReply(DeviceId deviceId, int requestId, String body);
void sendRpcRequest(RuleEngineDeviceRpcRequest request, Consumer<RuleEngineDeviceRpcResponse> consumer);
}

6
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -15,10 +15,12 @@
*/
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
@ -58,6 +60,8 @@ public interface TbContext {
void updateSelf(RuleNode self);
TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
RuleNodeId getSelfId();
TenantId getTenantId();
@ -78,6 +82,8 @@ public interface TbContext {
RuleChainService getRuleChainService();
RuleEngineRpcService getRpcService();
RuleEngineTelemetryService getTelemetryService();
TimeseriesService getTimeseriesService();

26
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbRelationTypes.java

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
/**
* Created by ashvayka on 19.01.18.
*/
public final class TbRelationTypes {
public static String SUCCESS = "Success";
public static String FAILURE = "Failure";
}

66
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "rpc call reply",
configClazz = TbSendRpcReplyNodeConfiguration.class,
nodeDescription = "Sends reply to the RPC call from device",
nodeDetails = "Expects messages with any message type. Will forward message body to the device."
)
public class TbSendRPCReplyNode implements TbNode {
private TbSendRpcReplyNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbSendRpcReplyNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!"));
} else if (StringUtils.isEmpty(requestIdStr)) {
ctx.tellError(msg, new RuntimeException("Request id is not present in the metadata!"));
} else if (StringUtils.isEmpty(msg.getData())) {
ctx.tellError(msg, new RuntimeException("Request body is empty!"));
} else {
ctx.getRpcService().sendRpcReply(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData());
}
}
@Override
public void destroy() {
}
}

101
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java

@ -0,0 +1,101 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "rpc call request",
configClazz = TbSendRpcReplyNodeConfiguration.class,
nodeDescription = "Sends one-way RPC call to device",
nodeDetails = "Expects messages with \"method\" and \"params\". Will forward response from device to next nodes."
)
public class TbSendRPCRequestNode implements TbNode {
private Random random = new Random();
private Gson gson = new Gson();
private JsonParser jsonParser = new JsonParser();
private TbSendRpcRequestNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbSendRpcRequestNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
JsonObject json = jsonParser.parse(msg.getData()).getAsJsonObject();
if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
ctx.tellError(msg, new RuntimeException("Message originator is not a device entity!"));
} else if (!json.has("method")) {
ctx.tellError(msg, new RuntimeException("Method is not present in the message!"));
} else if (!json.has("params")) {
ctx.tellError(msg, new RuntimeException("Params are not present in the message!"));
} else {
int requestId = json.has("requestId") ? json.get("requestId").getAsInt() : random.nextInt();
RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
.method(gson.toJson(json.get("method")))
.body(gson.toJson(json.get("params")))
.deviceId(new DeviceId(msg.getOriginator().getId()))
.requestId(requestId)
.timeout(TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()))
.build();
ctx.getRpcService().sendRpcRequest(request, ruleEngineDeviceRpcResponse -> {
if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().get());
ctx.tellNext(next, TbRelationTypes.SUCCESS);
} else {
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
ctx.tellNext(next, TbRelationTypes.FAILURE);
ctx.tellError(msg, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
}
});
}
}
@Override
public void destroy() {
}
private String wrap(String name, String body) {
JsonObject json = new JsonObject();
json.addProperty(name, body);
return gson.toJson(json);
}
}

33
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcReplyNodeConfiguration.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
@Data
public class TbSendRpcReplyNodeConfiguration implements NodeConfiguration<TbSendRpcReplyNodeConfiguration> {
private String requestIdMetaDataAttribute;
@Override
public TbSendRpcReplyNodeConfiguration defaultConfiguration() {
TbSendRpcReplyNodeConfiguration configuration = new TbSendRpcReplyNodeConfiguration();
configuration.setRequestIdMetaDataAttribute("requestId");
return configuration;
}
}

32
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRpcRequestNodeConfiguration.java

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbSendRpcRequestNodeConfiguration implements NodeConfiguration<TbSendRpcRequestNodeConfiguration> {
private int timeoutInSeconds;
@Override
public TbSendRpcRequestNodeConfiguration defaultConfiguration() {
TbSendRpcRequestNodeConfiguration configuration = new TbSendRpcRequestNodeConfiguration();
configuration.setTimeoutInSeconds(60);
return configuration;
}
}
Loading…
Cancel
Save