diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 73887b4723..58781c51f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -81,56 +81,61 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { private DataDecodingEncodingService dataDecodingEncodingService; public ListenableFuture processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { - log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); - DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); - switch (deviceUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - String deviceName = deviceUpdateMsg.getName(); - Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); - if (device != null) { - boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device); - if (deviceAlreadyExistsForThisEdge) { - log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + - "deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); - return updateDevice(tenantId, edge, deviceUpdateMsg); + try { + log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); + DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); + switch (deviceUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + String deviceName = deviceUpdateMsg.getName(); + Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); + if (device != null) { + boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device); + if (deviceAlreadyExistsForThisEdge) { + log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + + "deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); + return updateDevice(tenantId, edge, deviceUpdateMsg); + } else { + log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + + "Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); + String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15); + try { + createDevice(tenantId, deviceId, edge, deviceUpdateMsg, newDeviceName); + } catch (DataValidationException e) { + log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); + return Futures.immediateFuture(null); + } + ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + body.put("conflictName", deviceName); + ListenableFuture input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, deviceId, body); + return Futures.transformAsync(input, unused -> + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null), + dbCallbackExecutorService); + } } else { - log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + - "Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); - String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15); + log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg); try { - createDevice(tenantId, deviceId, edge, deviceUpdateMsg, newDeviceName); + createDevice(tenantId, deviceId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); } catch (DataValidationException e) { log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); return Futures.immediateFuture(null); } - ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - body.put("conflictName", deviceName); - ListenableFuture input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, deviceId, body); - return Futures.transformAsync(input, unused -> - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null), - dbCallbackExecutorService); + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); } - } else { - log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg); - try { - createDevice(tenantId, deviceId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); - } catch (DataValidationException e) { - log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); - return Futures.immediateFuture(null); + case ENTITY_UPDATED_RPC_MESSAGE: + return updateDevice(tenantId, edge, deviceUpdateMsg); + case ENTITY_DELETED_RPC_MESSAGE: + Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); + if (deviceToDelete != null) { + deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); } - return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); - } - case ENTITY_UPDATED_RPC_MESSAGE: - return updateDevice(tenantId, edge, deviceUpdateMsg); - case ENTITY_DELETED_RPC_MESSAGE: - Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); - if (deviceToDelete != null) { - deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); - } - return Futures.immediateFuture(null); - case UNRECOGNIZED: - default: - return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); + return Futures.immediateFuture(null); + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); + } + } catch (Exception e) { + log.error("Failed to process device message from edge, {}", deviceUpdateMsg, e); + return Futures.immediateFailedFuture(e); } }