|
|
|
@ -223,26 +223,29 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
} |
|
|
|
latch.countDown(); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Can't process downlink response message [{}]", msg, e); |
|
|
|
log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void sendResponseMsg(ResponseMsg responseMsg) { |
|
|
|
log.trace("[{}] Sending response msg [{}]", this.sessionId, responseMsg); |
|
|
|
if (isConnected()) { |
|
|
|
try { |
|
|
|
responseMsgLock.lock(); |
|
|
|
outputStream.onNext(responseMsg); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Failed to send response message [{}]", responseMsg, e); |
|
|
|
log.error("[{}] Failed to send response message [{}]", this.sessionId, responseMsg, e); |
|
|
|
connected = false; |
|
|
|
sessionCloseListener.accept(edge.getId()); |
|
|
|
} finally { |
|
|
|
responseMsgLock.unlock(); |
|
|
|
} |
|
|
|
log.trace("[{}] Response msg successfully sent [{}]", this.sessionId, responseMsg); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void onConfigurationUpdate(Edge edge) { |
|
|
|
log.debug("[{}] onConfigurationUpdate [{}]", this.sessionId, edge); |
|
|
|
try { |
|
|
|
this.edge = edge; |
|
|
|
EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder() |
|
|
|
@ -251,13 +254,15 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.setEdgeUpdateMsg(edgeConfig) |
|
|
|
.build()); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Failed to construct proto objects!", e); |
|
|
|
log.error("[{}] Failed to construct proto objects!", this.sessionId, e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void processHandleMessages() throws ExecutionException, InterruptedException { |
|
|
|
log.trace("[{}] processHandleMessages started", this.sessionId); |
|
|
|
if (isConnected()) { |
|
|
|
Long queueStartTs = getQueueStartTs().get(); |
|
|
|
log.trace("[{}] trying to find edge events using queue start ts [{}]", this.sessionId, queueStartTs); |
|
|
|
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); |
|
|
|
TimePageData<EdgeEvent> pageData; |
|
|
|
UUID ifOffset = null; |
|
|
|
@ -267,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
if (isConnected() && !pageData.getData().isEmpty()) { |
|
|
|
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); |
|
|
|
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); |
|
|
|
log.trace("[{}] downlink msg(s) are going to be send.", downlinkMsgsPack.size()); |
|
|
|
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size()); |
|
|
|
|
|
|
|
latch = new CountDownLatch(downlinkMsgsPack.size()); |
|
|
|
for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { |
|
|
|
@ -280,14 +285,14 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
|
|
|
|
success = latch.await(10, TimeUnit.SECONDS); |
|
|
|
if (!success) { |
|
|
|
log.warn("Failed to deliver the batch: {}", downlinkMsgsPack); |
|
|
|
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack); |
|
|
|
} |
|
|
|
} |
|
|
|
if (isConnected() && (!success || pageData.hasNext())) { |
|
|
|
try { |
|
|
|
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
log.error("Error during sleep between batches", e); |
|
|
|
log.error("[{}] Error during sleep between batches", this.sessionId, e); |
|
|
|
} |
|
|
|
if (success) { |
|
|
|
pageLink = pageData.getNextPageLink(); |
|
|
|
@ -302,15 +307,16 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
try { |
|
|
|
Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
log.error("Error during sleep", e); |
|
|
|
log.error("[{}] Error during sleep between no records interval", this.sessionId, e); |
|
|
|
} |
|
|
|
} |
|
|
|
log.trace("[{}] processHandleMessages finished", this.sessionId); |
|
|
|
} |
|
|
|
|
|
|
|
private List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) { |
|
|
|
List<DownlinkMsg> result = new ArrayList<>(); |
|
|
|
for (EdgeEvent edgeEvent : edgeEvents) { |
|
|
|
log.trace("Processing edge event [{}]", edgeEvent); |
|
|
|
log.trace("[{}] Processing edge event [{}]", this.sessionId, edgeEvent); |
|
|
|
try { |
|
|
|
DownlinkMsg downlinkMsg = null; |
|
|
|
switch (edgeEvent.getAction()) { |
|
|
|
@ -406,13 +412,14 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
} |
|
|
|
|
|
|
|
private void updateQueueStartTs(Long newStartTs) { |
|
|
|
log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs); |
|
|
|
newStartTs = ++newStartTs; // increments ts by 1 - next edge event search starts from current offset + 1
|
|
|
|
List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis())); |
|
|
|
ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes); |
|
|
|
} |
|
|
|
|
|
|
|
private DownlinkMsg processTelemetryMessage(EdgeEvent edgeEvent) { |
|
|
|
log.trace("Executing processTelemetryMessage, edgeEvent [{}]", edgeEvent); |
|
|
|
log.trace("[{}] Executing processTelemetryMessage, edgeEvent [{}]", this.sessionId, edgeEvent); |
|
|
|
EntityId entityId = null; |
|
|
|
switch (edgeEvent.getType()) { |
|
|
|
case DEVICE: |
|
|
|
@ -436,11 +443,11 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
} |
|
|
|
DownlinkMsg downlinkMsg = null; |
|
|
|
if (entityId != null) { |
|
|
|
log.debug("Sending telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getBody()); |
|
|
|
log.debug("[{}] Sending telemetry data msg, entityId [{}], body [{}]", this.sessionId, edgeEvent.getEntityId(), edgeEvent.getBody()); |
|
|
|
try { |
|
|
|
downlinkMsg = constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), JsonUtils.parse(mapper.writeValueAsString(edgeEvent.getBody()))); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("Can't send telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getBody(), e); |
|
|
|
log.warn("[{}] Can't send telemetry data msg, entityId [{}], body [{}]", this.sessionId, edgeEvent.getEntityId(), edgeEvent.getBody(), e); |
|
|
|
} |
|
|
|
} |
|
|
|
return downlinkMsg; |
|
|
|
@ -450,9 +457,6 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
UpdateMsgType msgType = getResponseMsgType(edgeEvent.getAction()); |
|
|
|
log.trace("Executing processEntityMessage, edgeEvent [{}], action [{}], msgType [{}]", edgeEvent, action, msgType); |
|
|
|
switch (edgeEvent.getType()) { |
|
|
|
case EDGE: |
|
|
|
// TODO: voba - add edge update logic
|
|
|
|
return null; |
|
|
|
case DEVICE: |
|
|
|
return processDevice(edgeEvent, msgType, action); |
|
|
|
case ASSET: |
|
|
|
@ -523,6 +527,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] device processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -554,6 +559,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] asset processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -585,6 +591,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] entity view processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -619,6 +626,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] dashboard processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -645,6 +653,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] customer processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -671,6 +680,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] rule chain processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -688,6 +698,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
} |
|
|
|
} |
|
|
|
log.trace("[{}] rule chain metadata processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -720,6 +731,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
} |
|
|
|
} |
|
|
|
log.trace("[{}] user processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -734,9 +746,11 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
private DownlinkMsg processRelation(EdgeEvent edgeEvent, UpdateMsgType msgType) { |
|
|
|
EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class); |
|
|
|
RelationUpdateMsg r = ctx.getRelationMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation); |
|
|
|
return DownlinkMsg.newBuilder() |
|
|
|
DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() |
|
|
|
.addAllRelationUpdateMsg(Collections.singletonList(r)) |
|
|
|
.build(); |
|
|
|
log.trace("[{}] relation processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
private DownlinkMsg processAlarm(EdgeEvent edgeEvent, UpdateMsgType msgType) { |
|
|
|
@ -752,6 +766,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e); |
|
|
|
} |
|
|
|
log.trace("[{}] alarm processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -778,6 +793,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] widget bundle processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
@ -804,15 +820,18 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.build(); |
|
|
|
break; |
|
|
|
} |
|
|
|
log.trace("[{}] widget type processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
private DownlinkMsg processAdminSettings(EdgeEvent edgeEvent) { |
|
|
|
AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class); |
|
|
|
AdminSettingsUpdateMsg t = ctx.getAdminSettingsMsgConstructor().constructAdminSettingsUpdateMsg(adminSettings); |
|
|
|
return DownlinkMsg.newBuilder() |
|
|
|
DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() |
|
|
|
.addAllAdminSettingsUpdateMsg(Collections.singletonList(t)) |
|
|
|
.build(); |
|
|
|
log.trace("[{}] admin settings processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
private UpdateMsgType getResponseMsgType(EdgeEventActionType actionType) { |
|
|
|
@ -841,9 +860,11 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
|
|
|
|
private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { |
|
|
|
EntityDataProto entityDataProto = ctx.getEntityDataMsgConstructor().constructEntityDataMsg(entityId, actionType, entityData); |
|
|
|
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() |
|
|
|
.addAllEntityData(Collections.singletonList(entityDataProto)); |
|
|
|
return builder.build(); |
|
|
|
DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() |
|
|
|
.addAllEntityData(Collections.singletonList(entityDataProto)) |
|
|
|
.build(); |
|
|
|
log.trace("[{}] entity data proto processed [{}]", this.sessionId, downlinkMsg); |
|
|
|
return downlinkMsg; |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<Void>> processUplinkMsg(UplinkMsg uplinkMsg) { |
|
|
|
@ -854,7 +875,6 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), entityData)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) { |
|
|
|
for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { |
|
|
|
result.add(ctx.getDeviceProcessor().onDeviceUpdate(edge.getTenantId(), edge, deviceUpdateMsg)); |
|
|
|
@ -906,12 +926,13 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
} |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("Can't process uplink msg [{}]", uplinkMsg, e); |
|
|
|
log.error("[{}] Can't process uplink msg [{}]", this.sessionId, uplinkMsg, e); |
|
|
|
} |
|
|
|
return Futures.allAsList(result); |
|
|
|
} |
|
|
|
|
|
|
|
private ConnectResponseMsg processConnect(ConnectRequestMsg request) { |
|
|
|
log.trace("[{}] processConnect [{}]", this.sessionId, request); |
|
|
|
Optional<Edge> optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey()); |
|
|
|
if (optional.isPresent()) { |
|
|
|
edge = optional.get(); |
|
|
|
@ -941,7 +962,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
.setConfiguration(EdgeConfiguration.getDefaultInstance()).build(); |
|
|
|
} |
|
|
|
|
|
|
|
private EdgeConfiguration constructEdgeConfigProto(Edge edge) throws JsonProcessingException { |
|
|
|
private EdgeConfiguration constructEdgeConfigProto(Edge edge) { |
|
|
|
return EdgeConfiguration.newBuilder() |
|
|
|
.setEdgeIdMSB(edge.getId().getId().getMostSignificantBits()) |
|
|
|
.setEdgeIdLSB(edge.getId().getId().getLeastSignificantBits()) |
|
|
|
@ -958,6 +979,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void close() { |
|
|
|
log.debug("[{}] Closing session", sessionId); |
|
|
|
connected = false; |
|
|
|
try { |
|
|
|
outputStream.onCompleted(); |
|
|
|
|