From c7b0878ac136d2acf3d640d790e2db631def5bfa Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 4 Sep 2023 11:12:58 +0300 Subject: [PATCH] Move pushEntityCreatedEventToRuleEngine method to base processor --- .../edge/rpc/processor/BaseEdgeProcessor.java | 30 +++++++++++++---- .../processor/asset/AssetEdgeProcessor.java | 29 ++++------------ .../asset/AssetProfileEdgeProcessor.java | 29 ++++------------ .../dashboard/DashboardEdgeProcessor.java | 29 ++++------------ .../processor/device/DeviceEdgeProcessor.java | 19 +++-------- .../device/DeviceProfileEdgeProcessor.java | 29 ++++------------ .../entityview/EntityViewEdgeProcessor.java | 33 +++++-------------- 7 files changed, 59 insertions(+), 139 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 239f72dcd6..1715c5212e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -46,12 +46,15 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; @@ -78,6 +81,8 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; @@ -566,18 +571,29 @@ public abstract class BaseEdgeProcessor { relationService.saveRelation(tenantId, relation); } - protected TbMsgMetaData getActionTbMsgMetaData(Edge edge, CustomerId customerId) { - TbMsgMetaData metaData = getTbMsgMetaData(edge); + protected TbMsgMetaData getEdgeActionTbMsgMetaData(Edge edge, CustomerId customerId) { + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("edgeId", edge.getId().toString()); + metaData.putValue("edgeName", edge.getName()); if (customerId != null && !customerId.isNullUid()) { metaData.putValue("customerId", customerId.toString()); } return metaData; } - protected TbMsgMetaData getTbMsgMetaData(Edge edge) { - TbMsgMetaData metaData = new TbMsgMetaData(); - metaData.putValue("edgeId", edge.getId().toString()); - metaData.putValue("edgeName", edge.getName()); - return metaData; + protected void pushEntityCreatedEventToRuleEngine(TenantId tenantId, EntityId entityId, CustomerId customerId, + String entity, TbMsgMetaData metaData) { + TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, entityId, customerId, metaData, TbMsgDataType.JSON, entity); + tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entity); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entity, t); + } + }); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index 0c41c7e0a0..71a39b16f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.service.edge.rpc.processor.asset; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; @@ -35,15 +32,12 @@ import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.asset.BaseAssetService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.UUID; @@ -104,22 +98,11 @@ public class AssetEdgeProcessor extends BaseAssetProcessor { private void pushAssetCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AssetId assetId) { try { Asset asset = assetService.findAssetById(tenantId, assetId); - ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(asset); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, assetId, asset.getCustomerId(), - getActionTbMsgMetaData(edge, asset.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, assetId, tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, asset); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, asset, t); - } - }); - } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}][{}] Failed to push asset action to rule engine: {}", tenantId, assetId, DataConstants.ENTITY_CREATED, e); + String assetAsString = JacksonUtil.toString(asset); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, asset.getCustomerId()); + pushEntityCreatedEventToRuleEngine(tenantId, assetId, asset.getCustomerId(), assetAsString, msgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push asset action to rule engine: {}", tenantId, assetId, TbMsgType.ENTITY_CREATED.name(), e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java index 21865baa09..71c231fb6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.service.edge.rpc.processor.asset; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.edge.Edge; @@ -35,14 +32,11 @@ import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.UUID; @@ -92,22 +86,11 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor { private void pushAssetProfileCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AssetProfileId assetProfileId) { try { AssetProfile assetProfile = assetProfileService.findAssetProfileById(tenantId, assetProfileId); - ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(assetProfile); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, assetProfileId, getTbMsgMetaData(edge), - TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, assetProfileId, tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, assetProfile); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, assetProfile, t); - } - }); - } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}][{}] Failed to push asset profile action to rule engine: {}", tenantId, assetProfileId, DataConstants.ENTITY_CREATED, e); + String assetProfileAsString = JacksonUtil.toString(assetProfile); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null); + pushEntityCreatedEventToRuleEngine(tenantId, assetProfileId, null, assetProfileAsString, msgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push asset profile action to rule engine: {}", tenantId, assetProfileId, TbMsgType.ENTITY_CREATED.name(), e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index ae9d7e5c33..40cc6169b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.service.edge.rpc.processor.dashboard; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Dashboard; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.ShortCustomerInfo; import org.thingsboard.server.common.data.edge.Edge; @@ -32,14 +29,11 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.Set; @@ -96,22 +90,11 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor { private void pushDashboardCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DashboardId dashboardId) { try { Dashboard dashboard = dashboardService.findDashboardById(tenantId, dashboardId); - ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(dashboard); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, dashboardId, null, - getActionTbMsgMetaData(edge, null), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, dashboardId, tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, dashboard); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, dashboard, t); - } - }); - } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}][{}] Failed to push dashboard action to rule engine: {}", tenantId, dashboardId, DataConstants.ENTITY_CREATED, e); + String dashboardAsString = JacksonUtil.toString(dashboard); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null); + pushEntityCreatedEventToRuleEngine(tenantId, dashboardId, null, dashboardAsString, msgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push dashboard action to rule engine: {}", tenantId, dashboardId, TbMsgType.ENTITY_CREATED.name(), e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 95bd292356..cbc8c57e51 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -113,21 +113,10 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) { try { Device device = deviceService.findDeviceById(tenantId, deviceId); - ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, device.getCustomerId(), - getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, device); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, device, t); - } - }); - } catch (JsonProcessingException | IllegalArgumentException e) { + String deviceAsString = JacksonUtil.toString(device); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, device.getCustomerId()); + pushEntityCreatedEventToRuleEngine(tenantId, deviceId, device.getCustomerId(), deviceAsString, msgMetaData); + } catch (Exception e) { log.warn("[{}][{}] Failed to push device action to rule engine: {}", tenantId, deviceId, TbMsgType.ENTITY_CREATED.name(), e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java index fe126afd6d..6feeee625d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.service.edge.rpc.processor.device; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; @@ -35,14 +32,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.UUID; @@ -92,22 +86,11 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor { private void pushDeviceProfileCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceProfileId deviceProfileId) { try { DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); - ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(deviceProfile); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceProfileId, getTbMsgMetaData(edge), - TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, deviceProfileId, tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, deviceProfile); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, deviceProfile, t); - } - }); - } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}][{}] Failed to push device profile action to rule engine: {}", tenantId, deviceProfileId, DataConstants.ENTITY_CREATED, e); + String deviceProfileAsString = JacksonUtil.toString(deviceProfile); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null); + pushEntityCreatedEventToRuleEngine(tenantId, deviceProfileId, null, deviceProfileAsString, msgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push device profile action to rule engine: {}", tenantId, deviceProfileId, TbMsgType.ENTITY_CREATED.name(), e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index be6d333332..6e624bce5f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -15,15 +15,12 @@ */ package org.thingsboard.server.service.edge.rpc.processor.entityview; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.edge.Edge; @@ -34,14 +31,11 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.UUID; @@ -90,7 +84,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { Boolean created = resultPair.getFirst(); if (created) { createRelationFromEdge(tenantId, edge.getId(), entityViewId); - pushAssetCreatedEventToRuleEngine(tenantId, edge, entityViewId); + pushEntityViewCreatedEventToRuleEngine(tenantId, edge, entityViewId); entityViewService.assignEntityViewToEdge(tenantId, entityViewId, edge.getId()); } Boolean assetNameUpdated = resultPair.getSecond(); @@ -99,25 +93,14 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { } } - private void pushAssetCreatedEventToRuleEngine(TenantId tenantId, Edge edge, EntityViewId entityViewId) { + private void pushEntityViewCreatedEventToRuleEngine(TenantId tenantId, Edge edge, EntityViewId entityViewId) { try { EntityView entityView = entityViewService.findEntityViewById(tenantId, entityViewId); - ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(entityView); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, entityViewId, entityView.getCustomerId(), - getActionTbMsgMetaData(edge, entityView.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, entityViewId, tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entityView); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entityView, t); - } - }); - } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}][{}] Failed to push entity view action to rule engine: {}", tenantId, entityViewId, DataConstants.ENTITY_CREATED, e); + String entityViewAsString = JacksonUtil.toString(entityView); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, entityView.getCustomerId()); + pushEntityCreatedEventToRuleEngine(tenantId, entityViewId, entityView.getCustomerId(), entityViewAsString, msgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push entity view action to rule engine: {}", tenantId, entityViewId, TbMsgType.ENTITY_CREATED.name(), e); } }