|
|
|
@ -86,26 +86,33 @@ public class TbMsgPushToEdgeNode implements TbNode { |
|
|
|
Futures.addCallback(getEdgeIdFuture, new FutureCallback<EdgeId>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable EdgeId edgeId) { |
|
|
|
EdgeEvent edgeEvent = null; |
|
|
|
try { |
|
|
|
edgeEvent = buildEdgeEvent(msg, ctx); |
|
|
|
edgeEvent.setEdgeId(edgeId); |
|
|
|
EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); |
|
|
|
if (edgeEvent == null) { |
|
|
|
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); |
|
|
|
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); |
|
|
|
} else { |
|
|
|
edgeEvent.setEdgeId(edgeId); |
|
|
|
ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); |
|
|
|
Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable EdgeEvent event) { |
|
|
|
ctx.tellNext(msg, SUCCESS); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable th) { |
|
|
|
log.error("Could not save edge event", th); |
|
|
|
ctx.tellFailure(msg, th); |
|
|
|
} |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
} catch (JsonProcessingException e) { |
|
|
|
log.error("Failed to build edge event", e); |
|
|
|
ctx.tellFailure(msg, e); |
|
|
|
} |
|
|
|
ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); |
|
|
|
Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable EdgeEvent event) { |
|
|
|
ctx.tellNext(msg, SUCCESS); |
|
|
|
} |
|
|
|
@Override |
|
|
|
public void onFailure(Throwable th) { |
|
|
|
log.error("Could not save edge event", th); |
|
|
|
ctx.tellFailure(msg, th); |
|
|
|
} |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
ctx.tellFailure(msg, t); |
|
|
|
@ -128,8 +135,7 @@ public class TbMsgPushToEdgeNode implements TbNode { |
|
|
|
} else { |
|
|
|
EdgeEventType edgeEventTypeByEntityType = EdgeUtils.getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType()); |
|
|
|
if (edgeEventTypeByEntityType == null) { |
|
|
|
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); |
|
|
|
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); |
|
|
|
return null; |
|
|
|
} |
|
|
|
return buildEdgeEvent(ctx.getTenantId(), getActionTypeByMsgType(msg.getType()), msg.getOriginator().getId(), edgeEventTypeByEntityType, json.readTree(msg.getData())); |
|
|
|
} |
|
|
|
|