|
|
|
@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
import org.thingsboard.server.common.data.Device; |
|
|
|
import org.thingsboard.server.common.data.DeviceProfile; |
|
|
|
@ -52,6 +53,7 @@ import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgDataType; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; |
|
|
|
import org.thingsboard.server.common.msg.session.SessionMsgType; |
|
|
|
import org.thingsboard.server.dao.exception.DataValidationException; |
|
|
|
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; |
|
|
|
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; |
|
|
|
@ -66,8 +68,14 @@ import org.thingsboard.server.queue.util.DataDecodingEncodingService; |
|
|
|
import org.thingsboard.server.queue.util.TbCoreComponent; |
|
|
|
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.Executors; |
|
|
|
import java.util.concurrent.ScheduledExecutorService; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.locks.ReentrantLock; |
|
|
|
|
|
|
|
@Component |
|
|
|
@ -75,11 +83,19 @@ import java.util.concurrent.locks.ReentrantLock; |
|
|
|
@TbCoreComponent |
|
|
|
public class DeviceEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
|
|
|
|
private final Map<String, EdgeRpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>(); |
|
|
|
private ScheduledExecutorService scheduler; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private DataDecodingEncodingService dataDecodingEncodingService; |
|
|
|
|
|
|
|
private static final ReentrantLock deviceCreationLock = new ReentrantLock(); |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init(){ |
|
|
|
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-edge-processor-scheduler")); |
|
|
|
} |
|
|
|
|
|
|
|
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { |
|
|
|
log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); |
|
|
|
switch (deviceUpdateMsg.getMsgType()) { |
|
|
|
@ -325,8 +341,17 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
return metaData; |
|
|
|
} |
|
|
|
|
|
|
|
public ListenableFuture<Void> processDeviceRpcCallResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { |
|
|
|
log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg); |
|
|
|
public ListenableFuture<Void> processDeviceRpcCallFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) { |
|
|
|
log.trace("[{}] processDeviceRpcCallFromEdge [{}]", tenantId, deviceRpcCallMsg); |
|
|
|
if (deviceRpcCallMsg.hasResponseMsg()) { |
|
|
|
return processDeviceRpcResponseFromEdge(tenantId, deviceRpcCallMsg); |
|
|
|
} else if (deviceRpcCallMsg.hasRequestMsg()) { |
|
|
|
return processDeviceRpcRequestFromEdge(tenantId, edge, deviceRpcCallMsg); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> processDeviceRpcResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { |
|
|
|
SettableFuture<Void> futureToSet = SettableFuture.create(); |
|
|
|
UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); |
|
|
|
@ -357,6 +382,68 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
return futureToSet; |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> processDeviceRpcRequestFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) { |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); |
|
|
|
UUID requestUUID = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); |
|
|
|
try { |
|
|
|
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.createObjectNode(); |
|
|
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
|
|
|
String requestId = Integer.toString(deviceRpcCallMsg.getRequestId()); |
|
|
|
metaData.putValue("requestId", requestId); |
|
|
|
metaData.putValue("requestUUID", requestUUID.toString()); |
|
|
|
// ?? metaData.putValue("originServiceId", deviceRpcRequestMsg.get);
|
|
|
|
metaData.putValue("expirationTime", Long.toString(deviceRpcCallMsg.getExpirationTime())); |
|
|
|
metaData.putValue("oneway", Boolean.toString(deviceRpcCallMsg.getOneway())); |
|
|
|
metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(deviceRpcCallMsg.getPersisted())); |
|
|
|
|
|
|
|
if (deviceRpcCallMsg.getRetries() > 0) { |
|
|
|
metaData.putValue(DataConstants.RETRIES, Integer.toString(deviceRpcCallMsg.getRetries())); |
|
|
|
} |
|
|
|
|
|
|
|
metaData.putValue(DataConstants.EDGE_ID, edge.getId().toString()); |
|
|
|
|
|
|
|
Device device = deviceService.findDeviceById(tenantId, deviceId); |
|
|
|
if (device != null) { |
|
|
|
metaData.putValue("deviceName", device.getName()); |
|
|
|
metaData.putValue("deviceType", device.getType()); |
|
|
|
metaData.putValue(DataConstants.DEVICE_ID, deviceId.getId().toString()); |
|
|
|
} |
|
|
|
|
|
|
|
entityNode.put("method", deviceRpcCallMsg.getRequestMsg().getMethod()); |
|
|
|
entityNode.put("params", deviceRpcCallMsg.getRequestMsg().getParams()); |
|
|
|
|
|
|
|
entityNode.put(DataConstants.ADDITIONAL_INFO, deviceRpcCallMsg.getAdditionalInfo()); |
|
|
|
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, null, metaData, |
|
|
|
TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); |
|
|
|
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(TbQueueMsgMetadata metadata) { |
|
|
|
log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", device, t); |
|
|
|
} |
|
|
|
}); |
|
|
|
toServerRpcPendingMap.put(requestId, new EdgeRpcRequestMetadata(tenantId, edge.getId(), deviceId)); |
|
|
|
scheduler.schedule(() -> processTimeout(requestId), 60000, TimeUnit.MILLISECONDS); |
|
|
|
} catch (JsonProcessingException | IllegalArgumentException e) { |
|
|
|
log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e); |
|
|
|
} |
|
|
|
|
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
|
|
|
|
private void processTimeout(String requestId) { |
|
|
|
EdgeRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); |
|
|
|
if (data != null) { |
|
|
|
// TODO: add failure body
|
|
|
|
saveEdgeEvent(data.getTenantId(), data.getEdgeId(), EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL_RESPONSE, |
|
|
|
data.getDeviceId(), JacksonUtil.OBJECT_MAPPER.valueToTree("{}")); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public DownlinkMsg convertDeviceEventToDownlink(EdgeEvent edgeEvent) { |
|
|
|
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); |
|
|
|
DownlinkMsg downlinkMsg = null; |
|
|
|
@ -401,8 +488,10 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
.build(); |
|
|
|
} |
|
|
|
break; |
|
|
|
case RPC_CALL: |
|
|
|
return convertRpcCallEventToDownlink(edgeEvent); |
|
|
|
case RPC_CALL_REQUEST: |
|
|
|
return convertRpcCallRequestEventToDownlink(edgeEvent); |
|
|
|
case RPC_CALL_RESPONSE: |
|
|
|
return convertRpcCallResponseEventToDownlink(edgeEvent); |
|
|
|
case CREDENTIALS_REQUEST: |
|
|
|
return convertCredentialsRequestEventToDownlink(edgeEvent); |
|
|
|
case ENTITY_MERGE_REQUEST: |
|
|
|
@ -411,10 +500,18 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) { |
|
|
|
log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent); |
|
|
|
DeviceRpcCallMsg deviceRpcCallMsg = |
|
|
|
deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); |
|
|
|
private DownlinkMsg convertRpcCallRequestEventToDownlink(EdgeEvent edgeEvent) { |
|
|
|
log.trace("Executing convertRpcCallRequestEventToDownlink, edgeEvent [{}]", edgeEvent); |
|
|
|
return DownlinkMsg.newBuilder() |
|
|
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt()) |
|
|
|
.addDeviceRpcCallMsg(deviceMsgConstructor.constructDeviceRpcRequestMsg(edgeEvent.getEntityId(), edgeEvent.getBody())) |
|
|
|
.build(); |
|
|
|
} |
|
|
|
|
|
|
|
private DownlinkMsg convertRpcCallResponseEventToDownlink(EdgeEvent edgeEvent) { |
|
|
|
log.trace("Executing convertRpcCallResponseEventToDownlink, edgeEvent [{}]", edgeEvent); |
|
|
|
DeviceRpcCallMsg deviceRpcCallMsg = deviceMsgConstructor.constructDeviceRpcResponseMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); |
|
|
|
toServerRpcPendingMap.remove(Integer.toString(deviceRpcCallMsg.getRequestId())); |
|
|
|
return DownlinkMsg.newBuilder() |
|
|
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt()) |
|
|
|
.addDeviceRpcCallMsg(deviceRpcCallMsg) |
|
|
|
|