Browse Source

Move pushEntityCreatedEventToRuleEngine method to base processor

pull/9175/head
Volodymyr Babak 3 years ago
parent
commit
c7b0878ac1
  1. 30
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java
  2. 29
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java
  3. 29
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java
  4. 29
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java
  5. 19
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java
  6. 29
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java
  7. 33
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java

30
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java

@ -46,12 +46,15 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetProfileService;
@ -78,6 +81,8 @@ import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor;
@ -566,18 +571,29 @@ public abstract class BaseEdgeProcessor {
relationService.saveRelation(tenantId, relation);
}
protected TbMsgMetaData getActionTbMsgMetaData(Edge edge, CustomerId customerId) {
TbMsgMetaData metaData = getTbMsgMetaData(edge);
protected TbMsgMetaData getEdgeActionTbMsgMetaData(Edge edge, CustomerId customerId) {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("edgeId", edge.getId().toString());
metaData.putValue("edgeName", edge.getName());
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
return metaData;
}
protected TbMsgMetaData getTbMsgMetaData(Edge edge) {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("edgeId", edge.getId().toString());
metaData.putValue("edgeName", edge.getName());
return metaData;
protected void pushEntityCreatedEventToRuleEngine(TenantId tenantId, EntityId entityId, CustomerId customerId,
String entity, TbMsgMetaData metaData) {
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, entityId, customerId, metaData, TbMsgDataType.JSON, entity);
tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entity);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entity, t);
}
});
}
}

29
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java

@ -15,15 +15,12 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.asset;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
@ -35,15 +32,12 @@ import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.BaseAssetService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.UUID;
@ -104,22 +98,11 @@ public class AssetEdgeProcessor extends BaseAssetProcessor {
private void pushAssetCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AssetId assetId) {
try {
Asset asset = assetService.findAssetById(tenantId, assetId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(asset);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, assetId, asset.getCustomerId(),
getActionTbMsgMetaData(edge, asset.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, assetId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, asset);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, asset, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}][{}] Failed to push asset action to rule engine: {}", tenantId, assetId, DataConstants.ENTITY_CREATED, e);
String assetAsString = JacksonUtil.toString(asset);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, asset.getCustomerId());
pushEntityCreatedEventToRuleEngine(tenantId, assetId, asset.getCustomerId(), assetAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push asset action to rule engine: {}", tenantId, assetId, TbMsgType.ENTITY_CREATED.name(), e);
}
}

29
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java

@ -15,15 +15,12 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.asset;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.edge.Edge;
@ -35,14 +32,11 @@ import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.UUID;
@ -92,22 +86,11 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
private void pushAssetProfileCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AssetProfileId assetProfileId) {
try {
AssetProfile assetProfile = assetProfileService.findAssetProfileById(tenantId, assetProfileId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(assetProfile);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, assetProfileId, getTbMsgMetaData(edge),
TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, assetProfileId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, assetProfile);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, assetProfile, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}][{}] Failed to push asset profile action to rule engine: {}", tenantId, assetProfileId, DataConstants.ENTITY_CREATED, e);
String assetProfileAsString = JacksonUtil.toString(assetProfile);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null);
pushEntityCreatedEventToRuleEngine(tenantId, assetProfileId, null, assetProfileAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push asset profile action to rule engine: {}", tenantId, assetProfileId, TbMsgType.ENTITY_CREATED.name(), e);
}
}

29
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java

@ -15,15 +15,12 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.dashboard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.ShortCustomerInfo;
import org.thingsboard.server.common.data.edge.Edge;
@ -32,14 +29,11 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Set;
@ -96,22 +90,11 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor {
private void pushDashboardCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DashboardId dashboardId) {
try {
Dashboard dashboard = dashboardService.findDashboardById(tenantId, dashboardId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(dashboard);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, dashboardId, null,
getActionTbMsgMetaData(edge, null), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, dashboardId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, dashboard);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, dashboard, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}][{}] Failed to push dashboard action to rule engine: {}", tenantId, dashboardId, DataConstants.ENTITY_CREATED, e);
String dashboardAsString = JacksonUtil.toString(dashboard);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null);
pushEntityCreatedEventToRuleEngine(tenantId, dashboardId, null, dashboardAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push dashboard action to rule engine: {}", tenantId, dashboardId, TbMsgType.ENTITY_CREATED.name(), e);
}
}

19
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java

@ -113,21 +113,10 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) {
try {
Device device = deviceService.findDeviceById(tenantId, deviceId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, device.getCustomerId(),
getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, device);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, device, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
String deviceAsString = JacksonUtil.toString(device);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, device.getCustomerId());
pushEntityCreatedEventToRuleEngine(tenantId, deviceId, device.getCustomerId(), deviceAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push device action to rule engine: {}", tenantId, deviceId, TbMsgType.ENTITY_CREATED.name(), e);
}
}

29
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java

@ -15,15 +15,12 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.device;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge;
@ -35,14 +32,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.UUID;
@ -92,22 +86,11 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
private void pushDeviceProfileCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceProfileId deviceProfileId) {
try {
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(deviceProfile);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceProfileId, getTbMsgMetaData(edge),
TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceProfileId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, deviceProfile);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, deviceProfile, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}][{}] Failed to push device profile action to rule engine: {}", tenantId, deviceProfileId, DataConstants.ENTITY_CREATED, e);
String deviceProfileAsString = JacksonUtil.toString(deviceProfile);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null);
pushEntityCreatedEventToRuleEngine(tenantId, deviceProfileId, null, deviceProfileAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push device profile action to rule engine: {}", tenantId, deviceProfileId, TbMsgType.ENTITY_CREATED.name(), e);
}
}

33
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java

@ -15,15 +15,12 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.entityview;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.edge.Edge;
@ -34,14 +31,11 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.UUID;
@ -90,7 +84,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
Boolean created = resultPair.getFirst();
if (created) {
createRelationFromEdge(tenantId, edge.getId(), entityViewId);
pushAssetCreatedEventToRuleEngine(tenantId, edge, entityViewId);
pushEntityViewCreatedEventToRuleEngine(tenantId, edge, entityViewId);
entityViewService.assignEntityViewToEdge(tenantId, entityViewId, edge.getId());
}
Boolean assetNameUpdated = resultPair.getSecond();
@ -99,25 +93,14 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
}
}
private void pushAssetCreatedEventToRuleEngine(TenantId tenantId, Edge edge, EntityViewId entityViewId) {
private void pushEntityViewCreatedEventToRuleEngine(TenantId tenantId, Edge edge, EntityViewId entityViewId) {
try {
EntityView entityView = entityViewService.findEntityViewById(tenantId, entityViewId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(entityView);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, entityViewId, entityView.getCustomerId(),
getActionTbMsgMetaData(edge, entityView.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, entityViewId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("[{}] Successfully send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entityView);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to send ENTITY_CREATED EVENT to rule engine [{}]", tenantId, entityView, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}][{}] Failed to push entity view action to rule engine: {}", tenantId, entityViewId, DataConstants.ENTITY_CREATED, e);
String entityViewAsString = JacksonUtil.toString(entityView);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, entityView.getCustomerId());
pushEntityCreatedEventToRuleEngine(tenantId, entityViewId, entityView.getCustomerId(), entityViewAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push entity view action to rule engine: {}", tenantId, entityViewId, TbMsgType.ENTITY_CREATED.name(), e);
}
}

Loading…
Cancel
Save