From 22118af4db7cc2fba246dc8f82f313983a46121a Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 8 Nov 2022 13:13:58 +0200 Subject: [PATCH] Added Edge RPC response/request support --- .../device/DeviceActorMessageProcessor.java | 5 +- .../service/edge/rpc/EdgeGrpcSession.java | 5 +- .../rpc/constructor/DeviceMsgConstructor.java | 46 +++++-- .../rpc/processor/DeviceEdgeProcessor.java | 113 ++++++++++++++++-- .../rpc/processor/EdgeRpcRequestMetadata.java | 28 +++++ .../server/edge/BaseDeviceEdgeTest.java | 3 +- .../server/common/data/DataConstants.java | 2 + .../common/data/edge/EdgeEventActionType.java | 3 +- common/edge-api/src/main/proto/edge.proto | 3 + .../rule/engine/api/RuleEngineRpcService.java | 2 - .../rule/engine/rpc/TbSendRPCReplyNode.java | 62 +++++++++- 11 files changed, 244 insertions(+), 28 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeRpcRequestMetadata.java diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 2ffe2002f9..9b95b188d2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -823,8 +823,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { body.put("expirationTime", msg.getExpirationTime()); body.put("method", msg.getBody().getMethod()); body.put("params", msg.getBody().getParams()); + body.put("persisted", msg.isPersisted()); + body.put("retries", msg.getRetries()); + body.put("additionalInfo", msg.getAdditionalInfo()); - EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body); + EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL_REQUEST, deviceId, body); return Futures.transform(systemContext.getEdgeEventService().saveAsync(edgeEvent), unused -> { systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2342997c65..aca2f8b562 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -463,7 +463,8 @@ public final class EdgeGrpcSession implements Closeable { case UNASSIGNED_FROM_CUSTOMER: case CREDENTIALS_REQUEST: case ENTITY_MERGE_REQUEST: - case RPC_CALL: + case RPC_CALL_REQUEST: + case RPC_CALL_RESPONSE: downlinkMsg = convertEntityEventToDownlink(edgeEvent); log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg); break; @@ -611,7 +612,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) { for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseFromEdge(edge.getTenantId(), deviceRpcCallMsg)); + result.add(ctx.getDeviceProcessor().processDeviceRpcCallFromEdge(edge.getTenantId(), edge, deviceRpcCallMsg)); } } if (uplinkMsg.getWidgetBundleTypesRequestMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java index 511910dd8a..49438792fa 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.RpcRequestMsg; +import org.thingsboard.server.gen.edge.v1.RpcResponseMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -96,26 +97,51 @@ public class DeviceMsgConstructor { .setIdLSB(deviceId.getId().getLeastSignificantBits()).build(); } - public DeviceRpcCallMsg constructDeviceRpcCallMsg(UUID deviceId, JsonNode body) { - int requestId = body.get("requestId").asInt(); - boolean oneway = body.get("oneway").asBoolean(); - UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); - long expirationTime = body.get("expirationTime").asLong(); + public DeviceRpcCallMsg constructDeviceRpcRequestMsg(UUID deviceId, JsonNode body) { + DeviceRpcCallMsg.Builder builder = constructDeviceRpcMsg(deviceId, body); + String method = body.get("method").asText(); String params = body.get("params").asText(); - RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder(); requestBuilder.setMethod(method); requestBuilder.setParams(params); - DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder() + builder.setRequestMsg(requestBuilder.build()); + + return builder.build(); + } + + public DeviceRpcCallMsg constructDeviceRpcResponseMsg(UUID deviceId, JsonNode body) { + DeviceRpcCallMsg.Builder builder = constructDeviceRpcMsg(deviceId, body); + + RpcResponseMsg.Builder responseBuilder = RpcResponseMsg.newBuilder(); + if (body.has("error")) { + responseBuilder.setError(body.get("error").asText()); + } else { + responseBuilder.setResponse(body.get("response").asText()); + } + builder.setResponseMsg(responseBuilder.build()); + + return builder.build(); + } + + private DeviceRpcCallMsg.Builder constructDeviceRpcMsg(UUID deviceId, JsonNode body) { + int requestId = body.get("requestId").asInt(); + boolean oneway = body.get("oneway").asBoolean(); + UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); + long expirationTime = body.get("expirationTime").asLong(); + boolean persisted = body.get("persisted").asBoolean(); + int retries = body.get("retries").asInt(); + String additionalInfo = body.get("additionalInfo").asText(); + return DeviceRpcCallMsg.newBuilder() .setDeviceIdMSB(deviceId.getMostSignificantBits()) .setDeviceIdLSB(deviceId.getLeastSignificantBits()) .setRequestUuidMSB(requestUUID.getMostSignificantBits()) .setRequestUuidLSB(requestUUID.getLeastSignificantBits()) - .setRequestId(requestId) .setExpirationTime(expirationTime) + .setRequestId(requestId) .setOneway(oneway) - .setRequestMsg(requestBuilder.build()); - return builder.build(); + .setPersisted(persisted) + .setRetries(retries) + .setAdditionalInfo(additionalInfo); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index e09e036953..235fef7431 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -52,6 +53,7 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; @@ -66,8 +68,14 @@ import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; +import javax.annotation.PostConstruct; +import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @Component @@ -75,11 +83,19 @@ import java.util.concurrent.locks.ReentrantLock; @TbCoreComponent public class DeviceEdgeProcessor extends BaseEdgeProcessor { + private final Map toServerRpcPendingMap = new ConcurrentHashMap<>(); + private ScheduledExecutorService scheduler; + @Autowired private DataDecodingEncodingService dataDecodingEncodingService; private static final ReentrantLock deviceCreationLock = new ReentrantLock(); + @PostConstruct + public void init(){ + this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-edge-processor-scheduler")); + } + public ListenableFuture processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); switch (deviceUpdateMsg.getMsgType()) { @@ -325,8 +341,17 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return metaData; } - public ListenableFuture processDeviceRpcCallResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { - log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg); + public ListenableFuture processDeviceRpcCallFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) { + log.trace("[{}] processDeviceRpcCallFromEdge [{}]", tenantId, deviceRpcCallMsg); + if (deviceRpcCallMsg.hasResponseMsg()) { + return processDeviceRpcResponseFromEdge(tenantId, deviceRpcCallMsg); + } else if (deviceRpcCallMsg.hasRequestMsg()) { + return processDeviceRpcRequestFromEdge(tenantId, edge, deviceRpcCallMsg); + } + return Futures.immediateFuture(null); + } + + private ListenableFuture processDeviceRpcResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { SettableFuture futureToSet = SettableFuture.create(); UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); @@ -357,6 +382,68 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return futureToSet; } + private ListenableFuture processDeviceRpcRequestFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) { + DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); + UUID requestUUID = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); + try { + ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + TbMsgMetaData metaData = new TbMsgMetaData(); + String requestId = Integer.toString(deviceRpcCallMsg.getRequestId()); + metaData.putValue("requestId", requestId); + metaData.putValue("requestUUID", requestUUID.toString()); + // ?? metaData.putValue("originServiceId", deviceRpcRequestMsg.get); + metaData.putValue("expirationTime", Long.toString(deviceRpcCallMsg.getExpirationTime())); + metaData.putValue("oneway", Boolean.toString(deviceRpcCallMsg.getOneway())); + metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(deviceRpcCallMsg.getPersisted())); + + if (deviceRpcCallMsg.getRetries() > 0) { + metaData.putValue(DataConstants.RETRIES, Integer.toString(deviceRpcCallMsg.getRetries())); + } + + metaData.putValue(DataConstants.EDGE_ID, edge.getId().toString()); + + Device device = deviceService.findDeviceById(tenantId, deviceId); + if (device != null) { + metaData.putValue("deviceName", device.getName()); + metaData.putValue("deviceType", device.getType()); + metaData.putValue(DataConstants.DEVICE_ID, deviceId.getId().toString()); + } + + entityNode.put("method", deviceRpcCallMsg.getRequestMsg().getMethod()); + entityNode.put("params", deviceRpcCallMsg.getRequestMsg().getParams()); + + entityNode.put(DataConstants.ADDITIONAL_INFO, deviceRpcCallMsg.getAdditionalInfo()); + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, null, metaData, + TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); + tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device); + } + + @Override + public void onFailure(Throwable t) { + log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", device, t); + } + }); + toServerRpcPendingMap.put(requestId, new EdgeRpcRequestMetadata(tenantId, edge.getId(), deviceId)); + scheduler.schedule(() -> processTimeout(requestId), 60000, TimeUnit.MILLISECONDS); + } catch (JsonProcessingException | IllegalArgumentException e) { + log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e); + } + + return Futures.immediateFuture(null); + } + + private void processTimeout(String requestId) { + EdgeRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); + if (data != null) { + // TODO: add failure body + saveEdgeEvent(data.getTenantId(), data.getEdgeId(), EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL_RESPONSE, + data.getDeviceId(), JacksonUtil.OBJECT_MAPPER.valueToTree("{}")); + } + } + public DownlinkMsg convertDeviceEventToDownlink(EdgeEvent edgeEvent) { DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); DownlinkMsg downlinkMsg = null; @@ -401,8 +488,10 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { .build(); } break; - case RPC_CALL: - return convertRpcCallEventToDownlink(edgeEvent); + case RPC_CALL_REQUEST: + return convertRpcCallRequestEventToDownlink(edgeEvent); + case RPC_CALL_RESPONSE: + return convertRpcCallResponseEventToDownlink(edgeEvent); case CREDENTIALS_REQUEST: return convertCredentialsRequestEventToDownlink(edgeEvent); case ENTITY_MERGE_REQUEST: @@ -411,10 +500,18 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return downlinkMsg; } - private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) { - log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent); - DeviceRpcCallMsg deviceRpcCallMsg = - deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); + private DownlinkMsg convertRpcCallRequestEventToDownlink(EdgeEvent edgeEvent) { + log.trace("Executing convertRpcCallRequestEventToDownlink, edgeEvent [{}]", edgeEvent); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addDeviceRpcCallMsg(deviceMsgConstructor.constructDeviceRpcRequestMsg(edgeEvent.getEntityId(), edgeEvent.getBody())) + .build(); + } + + private DownlinkMsg convertRpcCallResponseEventToDownlink(EdgeEvent edgeEvent) { + log.trace("Executing convertRpcCallResponseEventToDownlink, edgeEvent [{}]", edgeEvent); + DeviceRpcCallMsg deviceRpcCallMsg = deviceMsgConstructor.constructDeviceRpcResponseMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); + toServerRpcPendingMap.remove(Integer.toString(deviceRpcCallMsg.getRequestId())); return DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addDeviceRpcCallMsg(deviceRpcCallMsg) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeRpcRequestMetadata.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeRpcRequestMetadata.java new file mode 100644 index 0000000000..a5a71d0743 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeRpcRequestMetadata.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.Data; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; + +@Data +public class EdgeRpcRequestMetadata { + private final TenantId tenantId; + private final EdgeId edgeId; + private final DeviceId deviceId; +} diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 392a6814d4..8e8c616d7a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -512,7 +512,8 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { body.put("method", "test_method"); body.put("params", "{\"param1\":\"value1\"}"); - EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body); + EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL_REQUEST, + device.getId().getId(), EdgeEventType.DEVICE, body); edgeImitator.expectMessageAmount(1); edgeEventService.saveAsync(edgeEvent).get(); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 85b9e681a1..69c36697d3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -40,6 +40,8 @@ public class DataConstants { public static final String EXPIRATION_TIME = "expirationTime"; public static final String ADDITIONAL_INFO = "additionalInfo"; public static final String RETRIES = "retries"; + public static final String EDGE_ID = "edgeId"; + public static final String DEVICE_ID = "deviceId"; public static final String COAP_TRANSPORT_NAME = "COAP"; public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; public static final String MQTT_TRANSPORT_NAME = "MQTT"; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index bba8767fca..7fae6c56b1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -28,7 +28,8 @@ public enum EdgeEventActionType { UNASSIGNED_FROM_CUSTOMER, RELATION_ADD_OR_UPDATE, RELATION_DELETED, - RPC_CALL, + RPC_CALL_REQUEST, + RPC_CALL_RESPONSE, ALARM_ACK, ALARM_CLEAR, ASSIGNED_TO_EDGE, diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index a2e3b5989b..a5785d6870 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -430,6 +430,9 @@ message DeviceRpcCallMsg { bool oneway = 7; RpcRequestMsg requestMsg = 8; RpcResponseMsg responseMsg = 9; + bool persisted = 10; + int32 retries = 11; + string additionalInfo = 12; } message RpcRequestMsg { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java index abb5ace0f8..85596d7e7c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineRpcService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.rule.engine.api; -import org.thingsboard.server.common.data.id.DeviceId; - import java.util.UUID; import java.util.function.Consumer; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java index ce9b5050f0..22d13f196b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java @@ -15,15 +15,27 @@ */ package org.thingsboard.rule.engine.rpc; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.StringUtils; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -65,9 +77,53 @@ public class TbSendRPCReplyNode implements TbNode { } else if (StringUtils.isEmpty(msg.getData())) { ctx.tellFailure(msg, new RuntimeException("Request body is empty!")); } else { - ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData()); - ctx.tellSuccess(msg); + if (StringUtils.isNotBlank(msg.getMetaData().getValue(DataConstants.EDGE_ID))) { + saveRpcResponseToEdgeQueue(ctx, msg); + } else { + ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData()); + ctx.tellSuccess(msg); + } } } + private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg) { +// EdgeEvent edgeEvent = new EdgeEvent(); +// edgeEvent.setTenantId(tenantId); +// edgeEvent.setAction(eventAction); +// edgeEvent.setEntityId(entityId); +// edgeEvent.setType(eventType); +// edgeEvent.setBody(entityBody); +// edgeEvent.setEdgeId(edgeId); +// +// ObjectNode body = mapper.createObjectNode(); +// body.put("requestId", requestId); +// body.put("requestUUID", msg.getId().toString()); +// body.put("oneway", msg.isOneway()); +// body.put("expirationTime", msg.getExpirationTime()); +// body.put("method", msg.getBody().getMethod()); +// body.put("params", msg.getBody().getParams()); +// body.put("persisted", msg.isPersisted()); +// body.put("retries", msg.getRetries()); +// body.put("additionalInfo", msg.getAdditionalInfo()); + + EdgeId edgeId = new EdgeId(UUID.fromString(msg.getMetaData().getValue(DataConstants.EDGE_ID))); + DeviceId deviceId = new DeviceId(UUID.fromString(msg.getMetaData().getValue(DataConstants.DEVICE_ID))); + // TODO: add body + EdgeEvent edgeEvent = + EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE, + EdgeEventActionType.RPC_CALL_RESPONSE, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree("{}")); + + ListenableFuture future = ctx.getEdgeEventService().saveAsync(edgeEvent); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); + ctx.tellSuccess(msg); + } + + @Override + public void onFailure(Throwable t) { + } + }, ctx.getDbCallbackExecutor()); + } }