diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index af892615ea..0b28aca7b0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -107,6 +107,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -200,8 +201,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { boolean sent = false; if (systemContext.isEdgesEnabled() && edgeId != null) { log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId()); - saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()); - sent = true; + try { + saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()).get(); + sent = true; + } catch (InterruptedException | ExecutionException e) { + String errMsg = String.format("[%s][%s][%s] Failed to save rpc request to edge queue %s", tenantId, deviceId, edgeId.getId(), request); + log.error(errMsg, e); + } } else if (isSendNewRpcAvailable()) { sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); @@ -810,7 +816,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { systemContext.getTbCoreToTransportService().process(nodeId, msg); } - private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) { + private ListenableFuture saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) { ObjectNode body = mapper.createObjectNode(); body.put("requestId", requestId); body.put("requestUUID", msg.getId().toString()); @@ -821,17 +827,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body); - Futures.addCallback(systemContext.getEdgeEventService().saveAsync(edgeEvent), new FutureCallback<>() { - @Override - public void onSuccess(Void unused) { - systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - String errMsg = String.format("Failed to save edge event. msg [%s], edge event [%s]", msg, edgeEvent); - log.warn(errMsg, t); - } + return Futures.transform(systemContext.getEdgeEventService().saveAsync(edgeEvent), unused -> { + systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); + return null; }, systemContext.getDbCallbackExecutor()); } diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index f3d73f13e5..a516a1e4a3 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -180,7 +180,7 @@ public class EdgeController extends BaseController { } } - private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws IOException, ThingsboardException { + private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws Exception { if (!updated) { ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId()); edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index b24cf1d912..6b5e519f80 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +47,6 @@ import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.io.IOException; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -95,14 +95,14 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { } @Override - public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { + public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws Exception { edge.setRootRuleChainId(ruleChainId); Edge savedEdge = edgeService.saveEdge(edge); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.UPDATED, ruleChainId, null); + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.UPDATED, ruleChainId, null).get(); return savedEdge; } - private void saveEdgeEvent(TenantId tenantId, + private ListenableFuture saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, @@ -113,17 +113,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body); - Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void unused) { - clusterService.onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent); - log.warn(errMsg, t); - } + return Futures.transform(edgeEventService.saveAsync(edgeEvent), unused -> { + clusterService.onEdgeEventUpdate(tenantId, edgeId); + return null; }, dbCallBackExecutor); } @@ -133,9 +125,10 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { try { TenantId tenantId = TenantId.fromUUID(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB())); EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); + ListenableFuture future; switch (type) { case EDGE: - edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg); + future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg); break; case USER: case ASSET: @@ -144,33 +137,47 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case ENTITY_VIEW: case DASHBOARD: case RULE_CHAIN: - entityProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + future = entityProcessor.processEntityNotification(tenantId, edgeNotificationMsg); break; case CUSTOMER: - customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg); + future = customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg); break; case WIDGETS_BUNDLE: case WIDGET_TYPE: - entityProcessor.processEntityNotificationForAllEdges(tenantId, edgeNotificationMsg); + future = entityProcessor.processEntityNotificationForAllEdges(tenantId, edgeNotificationMsg); break; case ALARM: - alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); + future = alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); break; case RELATION: - relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); + future = relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); break; default: log.warn("Edge event type [{}] is not designed to be pushed to edge", type); + future = Futures.immediateFuture(null); } + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void unused) { + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable throwable) { + callBackFailure(edgeNotificationMsg, callback, throwable); + } + }, dbCallBackExecutor); } catch (Exception e) { - callback.onFailure(e); - String errMsg = String.format("Can't push to edge updates, edgeNotificationMsg [%s]", edgeNotificationMsg); - log.error(errMsg, e); - } finally { - callback.onSuccess(); + callBackFailure(edgeNotificationMsg, callback, e); } } + private void callBackFailure(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback, Throwable throwable) { + String errMsg = String.format("Can't push to edge updates, edgeNotificationMsg [%s]", edgeNotificationMsg); + log.error(errMsg, throwable); + callback.onFailure(throwable); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java index be6741cef6..ee36c80454 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java @@ -21,11 +21,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos; -import java.io.IOException; - public interface EdgeNotificationService { - Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException; + Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws Exception; void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java index ccd3f2362c..a63e377a5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java @@ -16,11 +16,9 @@ package org.thingsboard.server.service.edge.rpc.processor; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; @@ -43,6 +41,8 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; @Component @@ -148,49 +148,44 @@ public class AlarmEdgeProcessor extends BaseEdgeProcessor { return downlinkMsg; } - public void processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); switch (actionType) { case DELETED: EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); - Alarm alarm = mapper.readValue(edgeNotificationMsg.getBody(), Alarm.class); - saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, mapper.valueToTree(alarm)); - break; + Alarm deletedAlarm = mapper.readValue(edgeNotificationMsg.getBody(), Alarm.class); + return saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, mapper.valueToTree(deletedAlarm)); default: ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - Futures.addCallback(alarmFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Alarm alarm) { - if (alarm != null) { - EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type != null) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId edgeId : pageData.getData()) { - saveEdgeEvent(tenantId, - edgeId, - EdgeEventType.ALARM, - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), - alarmId, - null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } - } + return Futures.transformAsync(alarmFuture, alarm -> { + if (alarm == null) { + return Futures.immediateFuture(null); } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] can't find alarm by id [{}] {}", tenantId.getId(), alarmId.getId(), t); + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type == null) { + return Futures.immediateFuture(null); } + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + List> futures = new ArrayList<>(); + do { + pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + for (EdgeId relatedEdgeId : pageData.getData()) { + futures.add(saveEdgeEvent(tenantId, + relatedEdgeId, + EdgeEventType.ALARM, + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), + alarmId, + null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); }, dbCallbackExecutorService); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 2883ab78f4..09bef32af9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -17,10 +17,9 @@ package org.thingsboard.server.service.edge.rpc.processor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; @@ -71,6 +70,9 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.state.DeviceStateService; +import java.util.ArrayList; +import java.util.List; + @Slf4j public abstract class BaseEdgeProcessor { @@ -183,29 +185,21 @@ public abstract class BaseEdgeProcessor { @Autowired protected DbCallbackExecutorService dbCallbackExecutorService; - protected void saveEdgeEvent(TenantId tenantId, - EdgeId edgeId, - EdgeEventType type, - EdgeEventActionType action, - EntityId entityId, - JsonNode body) { + protected ListenableFuture saveEdgeEvent(TenantId tenantId, + EdgeId edgeId, + EdgeEventType type, + EdgeEventActionType action, + EntityId entityId, + JsonNode body) { log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " + "action [{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body); - Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void unused) { - tbClusterService.onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent); - log.warn(errMsg, t); - } + return Futures.transform(edgeEventService.saveAsync(edgeEvent), unused -> { + tbClusterService.onEdgeEventUpdate(tenantId, edgeId); + return null; }, dbCallbackExecutorService); } @@ -217,19 +211,21 @@ public abstract class BaseEdgeProcessor { } } - protected void processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) { + protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; + List> futures = new ArrayList<>(); do { pageData = edgeService.findEdgesByTenantId(tenantId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { for (Edge edge : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null); + futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null)); } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); } } } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java index 6c670e47ce..3554057482 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.Customer; @@ -35,6 +37,8 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; @Component @@ -70,7 +74,7 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor { return downlinkMsg; } - public void processCustomerNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + public ListenableFuture processCustomerNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); UUID uuid = new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()); @@ -79,22 +83,24 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor { case UPDATED: PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; + List> futures = new ArrayList<>(); do { pageData = edgeService.findEdgesByTenantIdAndCustomerId(tenantId, customerId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { for (Edge edge : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null); + futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null)); } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); } } } while (pageData != null && pageData.hasNext()); - break; + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); case DELETED: EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); - saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null); - break; + return saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null); + default: + return Futures.immediateFuture(null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index f19ec25b27..0e1dee7132 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -84,7 +84,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { if (deviceAlreadyExistsForThisEdge) { log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + "deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); - updateDevice(tenantId, edge, deviceUpdateMsg); + return updateDevice(tenantId, edge, deviceUpdateMsg); } else { log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + "Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); @@ -99,8 +99,10 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { } ObjectNode body = mapper.createObjectNode(); body.put("conflictName", deviceName); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null); + ListenableFuture input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); + return Futures.transformAsync(input, unused -> + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null), + dbCallbackExecutorService); } } else { log.info("[{}] Creating new device and replacing device entity on the edge [{}]", tenantId, deviceUpdateMsg); @@ -111,24 +113,22 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { log.error(errMsg, e); return Futures.immediateFuture(null); } - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null); + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null); } - break; case ENTITY_UPDATED_RPC_MESSAGE: - updateDevice(tenantId, edge, deviceUpdateMsg); - break; + return updateDevice(tenantId, edge, deviceUpdateMsg); case ENTITY_DELETED_RPC_MESSAGE: DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); if (deviceToDelete != null) { deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); } - break; + return Futures.immediateFuture(null); case UNRECOGNIZED: + default: log.error("Unsupported msg type {}", deviceUpdateMsg.getMsgType()); return Futures.immediateFailedFuture(new RuntimeException("Unsupported msg type " + deviceUpdateMsg.getMsgType())); } - return Futures.immediateFuture(null); } private boolean isDeviceAlreadyExistsOnCloudForThisEdge(TenantId tenantId, Edge edge, Device device) { @@ -174,7 +174,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { } - private void updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { + private ListenableFuture updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); Device device = deviceService.findDeviceById(tenantId, deviceId); if (device != null) { @@ -194,9 +194,11 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { } Device savedDevice = deviceService.saveDevice(device); tbClusterService.onDeviceUpdated(savedDevice, device); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); } else { - log.warn("[{}] can't find device [{}], edge [{}]", tenantId, deviceUpdateMsg, edge.getId()); + String errMsg = String.format("[%s] can't find device [%s], edge [%s]", tenantId, deviceUpdateMsg, edge.getId()); + log.warn(errMsg); + return Futures.immediateFailedFuture(new RuntimeException(errMsg)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java index 3d662da232..fdbebafc98 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java @@ -15,11 +15,9 @@ */ package org.thingsboard.server.service.edge.rpc.processor; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.edge.Edge; @@ -33,6 +31,8 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; @Component @@ -40,7 +40,7 @@ import java.util.UUID; @TbCoreComponent public class EdgeProcessor extends BaseEdgeProcessor { - public void processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + public ListenableFuture processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { try { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); @@ -49,54 +49,43 @@ public class EdgeProcessor extends BaseEdgeProcessor { case ASSIGNED_TO_CUSTOMER: CustomerId customerId = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class); edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); - Futures.addCallback(edgeFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Edge edge) { - if (edge != null && !customerId.isNullUid()) { - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null); - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); - for (User user : pageData.getData()) { - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't find edge by id [{}]", edgeNotificationMsg, t); + return Futures.transformAsync(edgeFuture, edge -> { + if (edge == null || customerId.isNullUid()) { + return Futures.immediateFuture(null); } + List> futures = new ArrayList<>(); + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null)); + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + do { + pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); + for (User user : pageData.getData()) { + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); }, dbCallbackExecutorService); - break; case UNASSIGNED_FROM_CUSTOMER: CustomerId customerIdToDelete = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class); edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); - Futures.addCallback(edgeFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Edge edge) { - if (edge != null && !customerIdToDelete.isNullUid()) { - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't find edge by id [{}]", edgeNotificationMsg, t); + return Futures.transformAsync(edgeFuture, edge -> { + if (edge == null || customerIdToDelete.isNullUid()) { + return Futures.immediateFuture(null); } + return saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null); }, dbCallbackExecutorService); - break; + default: + return Futures.immediateFuture(null); } } catch (Exception e) { log.error("Exception during processing edge event", e); + return Futures.immediateFailedFuture(e); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityEdgeProcessor.java index 89ce662467..195f932b93 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityEdgeProcessor.java @@ -15,11 +15,9 @@ */ package org.thingsboard.server.service.edge.rpc.processor; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -45,6 +43,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -89,92 +88,107 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor { return downlinkMsg; } - public void processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + public ListenableFuture processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); - EdgeId edgeId = null; - if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) { - edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); - } + EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg); switch (actionType) { case ADDED: // used only for USER entity case UPDATED: case CREDENTIALS_UPDATED: - pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType); - break; + return pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType); case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - try { - CustomerId customerId = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class); - ListenableFuture future = edgeService.findEdgeByIdAsync(tenantId, relatedEdgeId); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Edge edge) { - if (edge != null && edge.getCustomerId() != null && - !edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(customerId)) { - saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null); - } - } - - @Override - public void onFailure(Throwable t) { - log.error("Failed to find edge by id [{}] {}", edgeNotificationMsg, t); - } - }, dbCallbackExecutorService); - } catch (Exception e) { - log.error("Can't parse customer id from entity body [{}]", edgeNotificationMsg, e); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - break; + return pushNotificationToAllRelatedCustomerEdges(tenantId, edgeNotificationMsg, entityId, actionType, type); case DELETED: if (edgeId != null) { - saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null); + return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null); } else { - pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType); + return pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType); } - break; case ASSIGNED_TO_EDGE: case UNASSIGNED_FROM_EDGE: - saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null); - if (type.equals(EdgeEventType.RULE_CHAIN)) { - updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId); + ListenableFuture future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null); + return Futures.transformAsync(future, unused -> { + if (type.equals(EdgeEventType.RULE_CHAIN)) { + return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId); + } else { + return Futures.immediateFuture(null); + } + }, dbCallbackExecutorService); + default: + return Futures.immediateFuture(null); + } + } + + private ListenableFuture pushNotificationToAllRelatedCustomerEdges(TenantId tenantId, + TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, + EntityId entityId, + EdgeEventActionType actionType, + EdgeEventType type) { + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + List> futures = new ArrayList<>(); + do { + pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + for (EdgeId relatedEdgeId : pageData.getData()) { + try { + CustomerId customerId = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class); + ListenableFuture future = edgeService.findEdgeByIdAsync(tenantId, relatedEdgeId); + futures.add(Futures.transformAsync(future, edge -> { + if (edge != null && edge.getCustomerId() != null && + !edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(customerId)) { + return saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null); + } else { + return Futures.immediateFuture(null); + } + }, dbCallbackExecutorService)); + } catch (Exception e) { + log.error("Can't parse customer id from entity body [{}]", edgeNotificationMsg, e); + return Futures.immediateFailedFuture(e); + } } - break; + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + } + + private EdgeId safeGetEdgeId(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) { + return new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); + } else { + return null; } } - private void pushNotificationToAllRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) { + private ListenableFuture pushNotificationToAllRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; + List> futures = new ArrayList<>(); do { pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { for (EdgeId relatedEdgeId : pageData.getData()) { - saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null); + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); } } } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } - private void updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId) { + private ListenableFuture updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; + List> futures = new ArrayList<>(); do { pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { @@ -185,12 +199,12 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor { if (connectionInfos != null && !connectionInfos.isEmpty()) { for (RuleChainConnectionInfo connectionInfo : connectionInfos) { if (connectionInfo.getTargetRuleChainId().equals(processingRuleChainId)) { - saveEdgeEvent(tenantId, + futures.add(saveEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.UPDATED, ruleChain.getId(), - null); + null)); } } } @@ -201,9 +215,10 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor { } } } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } - public void processEntityNotificationForAllEdges(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + public ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); @@ -211,8 +226,9 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor { case ADDED: case UPDATED: case DELETED: - processActionForAllEdges(tenantId, type, actionType, entityId); - break; + return processActionForAllEdges(tenantId, type, actionType, entityId); + default: + return Futures.immediateFuture(null); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java index d49ac7094c..f86ea8aad2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java @@ -126,24 +126,29 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor { .build(); } - public void processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { + public ListenableFuture processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { EntityRelation relation = mapper.readValue(edgeNotificationMsg.getBody(), EntityRelation.class); - if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && - !relation.getTo().getEntityType().equals(EntityType.EDGE)) { - Set uniqueEdgeIds = new HashSet<>(); - uniqueEdgeIds.addAll(findRelatedEdgeIds(tenantId, relation.getTo())); - uniqueEdgeIds.addAll(findRelatedEdgeIds(tenantId, relation.getFrom())); - if (!uniqueEdgeIds.isEmpty()) { - for (EdgeId edgeId : uniqueEdgeIds) { - saveEdgeEvent(tenantId, - edgeId, - EdgeEventType.RELATION, - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), - null, - mapper.valueToTree(relation)); - } - } + if (relation.getFrom().getEntityType().equals(EntityType.EDGE) || + relation.getTo().getEntityType().equals(EntityType.EDGE)) { + return Futures.immediateFuture(null); + } + + Set uniqueEdgeIds = new HashSet<>(); + uniqueEdgeIds.addAll(findRelatedEdgeIds(tenantId, relation.getTo())); + uniqueEdgeIds.addAll(findRelatedEdgeIds(tenantId, relation.getFrom())); + if (uniqueEdgeIds.isEmpty()) { + return Futures.immediateFuture(null); + } + List> futures = new ArrayList<>(); + for (EdgeId edgeId : uniqueEdgeIds) { + futures.add(saveEdgeEvent(tenantId, + edgeId, + EdgeEventType.RELATION, + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), + null, + mapper.valueToTree(relation))); } + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } private List findRelatedEdgeIds(TenantId tenantId, EntityId entityId) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 3e8a2dd352..9d87cabb2c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; @@ -72,7 +73,6 @@ import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.v1.UserCredentialsRequestMsg; import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg; import org.thingsboard.server.service.executors.DbCallbackExecutorService; -import org.thingsboard.server.cluster.TbClusterService; import java.util.ArrayList; import java.util.HashMap; @@ -121,13 +121,13 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public ListenableFuture processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) { log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg); - if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { - RuleChainId ruleChainId = - new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); - saveEdgeEvent(tenantId, edge.getId(), - EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null); + if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() == 0 || ruleChainMetadataRequestMsg.getRuleChainIdLSB() == 0) { + return Futures.immediateFuture(null); } - return Futures.immediateFuture(null); + RuleChainId ruleChainId = + new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); + return saveEdgeEvent(tenantId, edge.getId(), + EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null); } @Override @@ -137,63 +137,72 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EntityType.valueOf(attributesRequestMsg.getEntityType()), new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB())); final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); - if (type != null) { - SettableFuture futureToSet = SettableFuture.create(); - String scope = attributesRequestMsg.getScope(); - ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); - Futures.addCallback(findAttrFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List ssAttributes) { - if (ssAttributes != null && !ssAttributes.isEmpty()) { - try { - Map entityData = new HashMap<>(); - ObjectNode attributes = mapper.createObjectNode(); - for (AttributeKvEntry attr : ssAttributes) { - if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { - attributes.put(attr.getKey(), attr.getBooleanValue().get()); - } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { - attributes.put(attr.getKey(), attr.getDoubleValue().get()); - } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { - attributes.put(attr.getKey(), attr.getLongValue().get()); - } else { - attributes.put(attr.getKey(), attr.getValueAsString()); - } - } - entityData.put("kv", attributes); - entityData.put("scope", scope); - JsonNode body = mapper.valueToTree(entityData); - log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); - saveEdgeEvent(tenantId, - edge.getId(), - type, - EdgeEventActionType.ATTRIBUTES_UPDATED, - entityId, - body); - } catch (Exception e) { - log.error("[{}] Failed to save attribute updates to the edge", edge.getName(), e); - futureToSet.setException(new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e)); - return; - } - } else { - log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, - edge.getName(), - entityId.getEntityType(), - entityId.getId()); - } + if (type == null) { + log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType()); + return Futures.immediateFuture(null); + } + SettableFuture futureToSet = SettableFuture.create(); + String scope = attributesRequestMsg.getScope(); + ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); + Futures.addCallback(findAttrFuture, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List ssAttributes) { + if (ssAttributes == null || ssAttributes.isEmpty()) { + log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, + edge.getName(), + entityId.getEntityType(), + entityId.getId()); futureToSet.set(null); + return; } - @Override - public void onFailure(Throwable t) { - log.error("Can't find attributes [{}]", attributesRequestMsg, t); - futureToSet.setException(t); + try { + Map entityData = new HashMap<>(); + ObjectNode attributes = mapper.createObjectNode(); + for (AttributeKvEntry attr : ssAttributes) { + if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { + attributes.put(attr.getKey(), attr.getBooleanValue().get()); + } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { + attributes.put(attr.getKey(), attr.getDoubleValue().get()); + } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { + attributes.put(attr.getKey(), attr.getLongValue().get()); + } else { + attributes.put(attr.getKey(), attr.getValueAsString()); + } + } + entityData.put("kv", attributes); + entityData.put("scope", scope); + JsonNode body = mapper.valueToTree(entityData); + log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); + ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), type, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void unused) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable throwable) { + String errMsg = String.format("[%s] Failed to save edge event [%s]", edge.getId(), attributesRequestMsg); + log.error(errMsg, throwable); + futureToSet.setException(new RuntimeException(errMsg, throwable)); + } + }, dbCallbackExecutorService); + } catch (Exception e) { + String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg); + log.error(errMsg, e); + futureToSet.setException(new RuntimeException(errMsg, e)); } - }, dbCallbackExecutorService); - return futureToSet; - } else { - log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType()); - return Futures.immediateFuture(null); - } + } + + @Override + public void onFailure(Throwable t) { + String errMsg = String.format("[%s] Can't find attributes [%s]", edge.getId(), attributesRequestMsg); + log.error(errMsg, t); + futureToSet.setException(new RuntimeException(errMsg, t)); + } + }, dbCallbackExecutorService); + return futureToSet; } @Override @@ -208,33 +217,49 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { futures.add(findRelationByQuery(tenantId, edge, entityId, EntitySearchDirection.TO)); ListenableFuture>> relationsListFuture = Futures.allAsList(futures); SettableFuture futureToSet = SettableFuture.create(); - Futures.addCallback(relationsListFuture, new FutureCallback>>() { + Futures.addCallback(relationsListFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable List> relationsList) { try { if (relationsList != null && !relationsList.isEmpty()) { + List> futures = new ArrayList<>(); for (List entityRelations : relationsList) { log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size()); for (EntityRelation relation : entityRelations) { try { if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && !relation.getTo().getEntityType().equals(EntityType.EDGE)) { - saveEdgeEvent(tenantId, + futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RELATION, EdgeEventActionType.ADDED, null, - mapper.valueToTree(relation)); + mapper.valueToTree(relation))); } } catch (Exception e) { - log.error("Exception during loading relation [{}] to edge on sync!", relation, e); - futureToSet.setException(e); + String errMsg = String.format("[%s] Exception during loading relation [%s] to edge on sync!", edge.getId(), relation); + log.error(errMsg, e); + futureToSet.setException(new RuntimeException(errMsg, e)); return; } } } + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List voids) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable throwable) { + String errMsg = String.format("[%s] Exception during saving edge events [%s]!", edge.getId(), relationRequestMsg); + log.error(errMsg, throwable); + futureToSet.setException(new RuntimeException(errMsg, throwable)); + } + }, dbCallbackExecutorService); + } else { + futureToSet.set(null); } - futureToSet.set(null); } catch (Exception e) { log.error("Exception during loading relation(s) to edge on sync!", e); futureToSet.setException(e); @@ -243,8 +268,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public void onFailure(Throwable t) { - log.error("[{}] Can't find relation by query. Entity id [{}]", tenantId, entityId, t); - futureToSet.setException(t); + String errMsg = String.format("[%s] Can't find relation by query. Entity id [%s]!", tenantId, entityId); + log.error(errMsg, t); + futureToSet.setException(new RuntimeException(errMsg, t)); } }, dbCallbackExecutorService); return futureToSet; @@ -260,40 +286,42 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public ListenableFuture processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) { log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), deviceCredentialsRequestMsg); - if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) { - DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB())); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, - EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null); + if (deviceCredentialsRequestMsg.getDeviceIdMSB() == 0 || deviceCredentialsRequestMsg.getDeviceIdLSB() == 0) { + return Futures.immediateFuture(null); } - return Futures.immediateFuture(null); + DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB())); + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, + EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null); } @Override public ListenableFuture processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) { log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), userCredentialsRequestMsg); - if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) { - UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB())); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, - EdgeEventActionType.CREDENTIALS_UPDATED, userId, null); + if (userCredentialsRequestMsg.getUserIdMSB() == 0 || userCredentialsRequestMsg.getUserIdLSB() == 0) { + return Futures.immediateFuture(null); } - return Futures.immediateFuture(null); + UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB())); + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, + EdgeEventActionType.CREDENTIALS_UPDATED, userId, null); } @Override public ListenableFuture processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) { log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg); - if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) { - DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB())); - DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); - if (deviceProfileById != null) { - syncDevices(tenantId, edge, deviceProfileById.getName()); - } + if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() == 0 || deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() == 0) { + return Futures.immediateFuture(null); } - return Futures.immediateFuture(null); + DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB())); + DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); + if (deviceProfileById == null) { + return Futures.immediateFuture(null); + } + return syncDevices(tenantId, edge, deviceProfileById.getName()); } - private void syncDevices(TenantId tenantId, Edge edge, String deviceType) { + private ListenableFuture syncDevices(TenantId tenantId, Edge edge, String deviceType) { log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType); + List> futures = new ArrayList<>(); try { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; @@ -302,7 +330,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); for (Device device : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null); + futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null)); } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); @@ -312,25 +340,26 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } catch (Exception e) { log.error("Exception during loading edge device(s) on sync!", e); } + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } @Override public ListenableFuture processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) { log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg); + List> futures = new ArrayList<>(); if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) { WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB())); WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId); if (widgetsBundleById != null) { List widgetTypesToPush = widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias()); - for (WidgetType widgetType : widgetTypesToPush) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null); + futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null)); } } } - return Futures.immediateFuture(null); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } @Override @@ -343,46 +372,35 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { Futures.addCallback(entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback<>() { @Override public void onSuccess(@Nullable List entityViews) { - try { - if (entityViews != null && !entityViews.isEmpty()) { - List> futures = new ArrayList<>(); - for (EntityView entityView : entityViews) { - ListenableFuture future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), - EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); - futures.add(future); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Boolean result) { - if (Boolean.TRUE.equals(result)) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, - EdgeEventActionType.ADDED, entityView.getId(), null); - } - } - @Override - public void onFailure(Throwable t) { - // Do nothing - error handles in allAsList - } - }, dbCallbackExecutorService); + if (entityViews == null || entityViews.isEmpty()) { + futureToSet.set(null); + return; + } + List> futures = new ArrayList<>(); + for (EntityView entityView : entityViews) { + ListenableFuture future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), + EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); + futures.add(Futures.transformAsync(future, result -> { + if (Boolean.TRUE.equals(result)) { + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, + EdgeEventActionType.ADDED, entityView.getId(), null); + } else { + return Futures.immediateFuture(null); } - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List result) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Exception during loading relation [{}] to edge on sync!", t, t); - futureToSet.setException(t); - } - }, dbCallbackExecutorService); - } else { + }, dbCallbackExecutorService)); + } + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { futureToSet.set(null); } - } catch (Exception e) { - log.error("Exception during loading relation(s) to edge on sync!", e); - futureToSet.setException(e); - } + + @Override + public void onFailure(Throwable t) { + log.error("Exception during loading relation to edge on sync!", t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); } @Override @@ -394,7 +412,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { return futureToSet; } - private void saveEdgeEvent(TenantId tenantId, + private ListenableFuture saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, @@ -405,17 +423,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body); - Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void unused) { - tbClusterService.onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent); - log.warn(errMsg, t); - } + return Futures.transform(edgeEventService.saveAsync(edgeEvent), unused -> { + tbClusterService.onEdgeEventUpdate(tenantId, edgeId); + return null; }, dbCallbackExecutorService); }