Browse Source

Code cleanup

pull/7592/head
Volodymyr Babak 4 years ago
parent
commit
5ff8144f8d
  1. 16
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java
  2. 10
      common/edge-api/src/main/proto/edge.proto
  3. 5
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java
  4. 33
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java

16
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java

@ -369,36 +369,22 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
private ListenableFuture<Void> processDeviceRpcRequestFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) {
DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB()));
UUID requestUUID = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB());
try {
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.createObjectNode();
TbMsgMetaData metaData = new TbMsgMetaData();
String requestId = Integer.toString(deviceRpcCallMsg.getRequestId());
metaData.putValue("requestId", requestId);
metaData.putValue("requestUUID", requestUUID.toString());
metaData.putValue("serviceId", deviceRpcCallMsg.getServiceId());
metaData.putValue("sessionId", deviceRpcCallMsg.getSessionId());
metaData.putValue("expirationTime", Long.toString(deviceRpcCallMsg.getExpirationTime()));
metaData.putValue("oneway", Boolean.toString(deviceRpcCallMsg.getOneway()));
metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(deviceRpcCallMsg.getPersisted()));
if (deviceRpcCallMsg.getRetries() > 0) {
metaData.putValue(DataConstants.RETRIES, Integer.toString(deviceRpcCallMsg.getRetries()));
}
metaData.putValue(DataConstants.EDGE_ID, edge.getId().toString());
Device device = deviceService.findDeviceById(tenantId, deviceId);
if (device != null) {
metaData.putValue("deviceName", device.getName());
metaData.putValue("deviceType", device.getType());
metaData.putValue(DataConstants.DEVICE_ID, deviceId.getId().toString());
}
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.createObjectNode();
entityNode.put("method", deviceRpcCallMsg.getRequestMsg().getMethod());
entityNode.put("params", deviceRpcCallMsg.getRequestMsg().getParams());
entityNode.put(DataConstants.ADDITIONAL_INFO, deviceRpcCallMsg.getAdditionalInfo());
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, null, metaData,
TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {

10
common/edge-api/src/main/proto/edge.proto

@ -430,11 +430,11 @@ message DeviceRpcCallMsg {
bool oneway = 7;
RpcRequestMsg requestMsg = 8;
RpcResponseMsg responseMsg = 9;
bool persisted = 10;
int32 retries = 11;
string additionalInfo = 12;
string serviceId = 13;
string sessionId = 14;
optional bool persisted = 10;
optional int32 retries = 11;
optional string additionalInfo = 12;
optional string serviceId = 13;
optional string sessionId = 14;
}
message RpcRequestMsg {

5
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java

@ -142,8 +142,11 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
} else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
actionType = EdgeEventActionType.POST_ATTRIBUTES;
} else {
} else if (DataConstants.ATTRIBUTES_DELETED.equals(msgType)) {
actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
} else {
log.warn("Unsupported msg type [{}]", msgType);
throw new IllegalArgumentException("Unsupported msg type: " + msgType);
}
return actionType;
}

33
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.rule.engine.rpc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -80,11 +79,7 @@ public class TbSendRPCReplyNode implements TbNode {
ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
} else {
if (StringUtils.isNotBlank(msg.getMetaData().getValue(DataConstants.EDGE_ID))) {
try {
saveRpcResponseToEdgeQueue(ctx, msg, serviceIdStr, sessionIdStr, requestIdStr);
} catch (Exception e) {
ctx.tellFailure(msg, e);
}
saveRpcResponseToEdgeQueue(ctx, msg, serviceIdStr, sessionIdStr, requestIdStr);
} else {
ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData());
ctx.tellSuccess(msg);
@ -92,36 +87,16 @@ public class TbSendRPCReplyNode implements TbNode {
}
}
private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg, String serviceIdStr, String sessionIdStr, String requestIdStr) throws JsonProcessingException {
// EdgeEvent edgeEvent = new EdgeEvent();
// edgeEvent.setTenantId(tenantId);
// edgeEvent.setAction(eventAction);
// edgeEvent.setEntityId(entityId);
// edgeEvent.setType(eventType);
// edgeEvent.setBody(entityBody);
// edgeEvent.setEdgeId(edgeId);
//
private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg, String serviceIdStr, String sessionIdStr, String requestIdStr) {
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
body.put("serviceId", serviceIdStr);
body.put("sessionId", sessionIdStr);
body.put("requestId", requestIdStr);
body.put("response", JacksonUtil.OBJECT_MAPPER.writeValueAsString(msg.getData()));
// body.put("requestUUID", msg.getId().toString());
// body.put("oneway", msg.isOneway());
// body.put("expirationTime", msg.getExpirationTime());
// body.put("method", msg.getBody().getMethod());
// body.put("params", msg.getBody().getParams());
// body.put("persisted", msg.isPersisted());
// body.put("retries", msg.getRetries());
// body.put("additionalInfo", msg.getAdditionalInfo());
body.put("response", msg.getData());
EdgeId edgeId = new EdgeId(UUID.fromString(msg.getMetaData().getValue(DataConstants.EDGE_ID)));
DeviceId deviceId = new DeviceId(UUID.fromString(msg.getMetaData().getValue(DataConstants.DEVICE_ID)));
// TODO: add body
EdgeEvent edgeEvent =
EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
EdgeEventActionType.RPC_CALL_RESPONSE, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body));
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
Futures.addCallback(future, new FutureCallback<Void>() {
@Override

Loading…
Cancel
Save