|
|
|
@ -81,56 +81,61 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
private DataDecodingEncodingService dataDecodingEncodingService; |
|
|
|
|
|
|
|
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { |
|
|
|
log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); |
|
|
|
switch (deviceUpdateMsg.getMsgType()) { |
|
|
|
case ENTITY_CREATED_RPC_MESSAGE: |
|
|
|
String deviceName = deviceUpdateMsg.getName(); |
|
|
|
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); |
|
|
|
if (device != null) { |
|
|
|
boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device); |
|
|
|
if (deviceAlreadyExistsForThisEdge) { |
|
|
|
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + |
|
|
|
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); |
|
|
|
return updateDevice(tenantId, edge, deviceUpdateMsg); |
|
|
|
try { |
|
|
|
log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); |
|
|
|
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); |
|
|
|
switch (deviceUpdateMsg.getMsgType()) { |
|
|
|
case ENTITY_CREATED_RPC_MESSAGE: |
|
|
|
String deviceName = deviceUpdateMsg.getName(); |
|
|
|
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); |
|
|
|
if (device != null) { |
|
|
|
boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device); |
|
|
|
if (deviceAlreadyExistsForThisEdge) { |
|
|
|
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + |
|
|
|
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); |
|
|
|
return updateDevice(tenantId, edge, deviceUpdateMsg); |
|
|
|
} else { |
|
|
|
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + |
|
|
|
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); |
|
|
|
String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15); |
|
|
|
try { |
|
|
|
createDevice(tenantId, deviceId, edge, deviceUpdateMsg, newDeviceName); |
|
|
|
} catch (DataValidationException e) { |
|
|
|
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode(); |
|
|
|
body.put("conflictName", deviceName); |
|
|
|
ListenableFuture<Void> input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, deviceId, body); |
|
|
|
return Futures.transformAsync(input, unused -> |
|
|
|
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null), |
|
|
|
dbCallbackExecutorService); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + |
|
|
|
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); |
|
|
|
String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15); |
|
|
|
log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg); |
|
|
|
try { |
|
|
|
createDevice(tenantId, deviceId, edge, deviceUpdateMsg, newDeviceName); |
|
|
|
createDevice(tenantId, deviceId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); |
|
|
|
} catch (DataValidationException e) { |
|
|
|
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode(); |
|
|
|
body.put("conflictName", deviceName); |
|
|
|
ListenableFuture<Void> input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, deviceId, body); |
|
|
|
return Futures.transformAsync(input, unused -> |
|
|
|
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null), |
|
|
|
dbCallbackExecutorService); |
|
|
|
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg); |
|
|
|
try { |
|
|
|
createDevice(tenantId, deviceId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); |
|
|
|
} catch (DataValidationException e) { |
|
|
|
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
case ENTITY_UPDATED_RPC_MESSAGE: |
|
|
|
return updateDevice(tenantId, edge, deviceUpdateMsg); |
|
|
|
case ENTITY_DELETED_RPC_MESSAGE: |
|
|
|
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); |
|
|
|
if (deviceToDelete != null) { |
|
|
|
deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); |
|
|
|
} |
|
|
|
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); |
|
|
|
} |
|
|
|
case ENTITY_UPDATED_RPC_MESSAGE: |
|
|
|
return updateDevice(tenantId, edge, deviceUpdateMsg); |
|
|
|
case ENTITY_DELETED_RPC_MESSAGE: |
|
|
|
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); |
|
|
|
if (deviceToDelete != null) { |
|
|
|
deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
case UNRECOGNIZED: |
|
|
|
default: |
|
|
|
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
case UNRECOGNIZED: |
|
|
|
default: |
|
|
|
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Failed to process device message from edge, {}", deviceUpdateMsg, e); |
|
|
|
return Futures.immediateFailedFuture(e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|