From 808707afa4db07ad39c679e3e3727154ac71c4fb Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 23 Jun 2020 11:10:53 +0300 Subject: [PATCH] Implementation and tests for Latest Subscription --- ...efaultTbEntityDataSubscriptionService.java | 12 +++- .../subscription/TbEntityDataSubCtx.java | 61 +++++++++++++++---- .../TelemetryWebSocketSessionRef.java | 4 ++ .../controller/BaseWebsocketApiTest.java | 19 ++++-- .../controller/TbTestWebSocketClient.java | 51 +++++++--------- application/src/test/resources/logback.xml | 2 + .../common/data/query/EntityDataQuery.java | 2 + 7 files changed, 103 insertions(+), 48 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 4fd46195d0..404b4f59c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -66,6 +66,7 @@ import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -169,7 +170,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc if (ctx != null) { log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) { - ctx.clearSubscriptions(); + Collection oldSubIds = ctx.clearSubscriptions(); + oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); } //TODO: cleanup old subscription; } else { @@ -195,10 +197,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc }); } PageData data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery()); + if (log.isTraceEnabled()) { + data.getData().forEach(ed -> { + log.trace("[{}][{}] EntityData: {}", session.getSessionId(), cmd.getCmdId(), ed); + }); + } ctx.setData(data); } ListenableFuture historyFuture; if (cmd.getHistoryCmd() != null) { + log.trace("[{}][{}] Going to process history command: {}", session.getSessionId(), cmd.getCmdId(), cmd.getHistoryCmd()); historyFuture = handleHistoryCmd(ctx, cmd.getHistoryCmd()); } else { historyFuture = Futures.immediateFuture(ctx); @@ -241,8 +249,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { + log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. if (!tsInSqlDB) { + log.trace("[{}][{}] Going to fetch missing latest values: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); List allTsKeys = latestCmd.getKeys().stream() .filter(key -> key.getType().equals(EntityKeyType.TIME_SERIES)) .map(EntityKey::getKey).collect(Collectors.toList()); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 98ba5c954f..403c34449e 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.subscription; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -34,12 +35,13 @@ import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +@Slf4j @Data public class TbEntityDataSubCtx { @@ -54,7 +56,6 @@ public class TbEntityDataSubCtx { private PageData data; private boolean initialDataSent; private List tbSubs; - private int internalSubIdx; private Map subToEntityIdMap; public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) { @@ -82,7 +83,6 @@ public class TbEntityDataSubCtx { public List createSubscriptions(List keys) { this.subToEntityIdMap = new HashMap<>(); - this.internalSubIdx = cmdId * MAX_SUBS_PER_CMD; tbSubs = new ArrayList<>(); List attrSubKeys = new ArrayList<>(); List tsSubKeys = new ArrayList<>(); @@ -107,20 +107,28 @@ public class TbEntityDataSubCtx { } private TbSubscription createTsSub(EntityData entityData, List tsSubKeys) { - int subIdx = internalSubIdx++; + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); subToEntityIdMap.put(subIdx, entityData.getEntityId()); Map keyStates = new HashMap<>(); tsSubKeys.forEach(key -> keyStates.put(key.getKey(), 0L)); if (entityData.getLatest() != null) { Map currentValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES); if (currentValues != null) { - currentValues.forEach((k, v) -> keyStates.put(k, v.getTs())); + currentValues.forEach((k, v) -> { + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, v.getTs()); + keyStates.put(k, v.getTs()); + }); } } if (entityData.getTimeseries() != null) { - entityData.getTimeseries().forEach((k, v) -> keyStates.put(k, Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L))); + entityData.getTimeseries().forEach((k, v) -> { + long ts = Arrays.stream(v).map(TsValue::getTs).max(Long::compareTo).orElse(0L); + log.trace("[{}][{}] Updating key: {} with ts: {}", serviceId, cmdId, k, ts); + keyStates.put(k, ts); + }); } + log.trace("[{}][{}][{}] Creating subscription with keys: {}", serviceId, cmdId, subIdx, keyStates); return TbTimeseriesSubscription.builder() .serviceId(serviceId) .sessionId(sessionRef.getSessionId()) @@ -136,19 +144,48 @@ public class TbEntityDataSubCtx { private void sendTsWsMsg(String sessionId, SubscriptionUpdate subscriptionUpdate) { EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); if (entityId != null) { - Map latest = new HashMap<>(); + log.trace("[{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), subscriptionUpdate); + Map latestUpdate = new HashMap<>(); subscriptionUpdate.getData().forEach((k, v) -> { Object[] data = (Object[]) v.get(0); - latest.put(k, new TsValue((Long) data[0], (String) data[1])); + latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); }); - Map> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latest); - EntityData entityData = new EntityData(entityId, latestMap, null); - wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); + EntityData entityData = getDataForEntity(entityId); + if (entityData != null && entityData.getLatest() != null) { + Map latestCtxValues = entityData.getLatest().get(EntityKeyType.TIME_SERIES); + log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues); + if (latestCtxValues != null) { + latestCtxValues.forEach((k, v) -> { + TsValue update = latestUpdate.get(k); + if (update.getTs() < v.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } + }); + //Setting new values + latestUpdate.forEach(latestCtxValues::put); + } + } + if (!latestUpdate.isEmpty()) { + Map> latestMap = Collections.singletonMap(EntityKeyType.TIME_SERIES, latestUpdate); + entityData = new EntityData(entityId, latestMap, null); + wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); + } + } else { + log.trace("[{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), subscriptionUpdate); } + } + private EntityData getDataForEntity(EntityId entityId) { + return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null); } - public void clearSubscriptions() { + public Collection clearSubscriptions() { + List oldSubIds = new ArrayList<>(subToEntityIdMap.keySet()); subToEntityIdMap.clear(); + return oldSubIds; } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java index cd70f5af4f..0c2a7cedbb 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketSessionRef.java @@ -20,6 +20,7 @@ import org.thingsboard.server.service.security.model.SecurityUser; import java.net.InetSocketAddress; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 27.03.18. @@ -36,12 +37,15 @@ public class TelemetryWebSocketSessionRef { private final InetSocketAddress localAddress; @Getter private final InetSocketAddress remoteAddress; + @Getter + private final AtomicInteger sessionSubIdSeq; public TelemetryWebSocketSessionRef(String sessionId, SecurityUser securityCtx, InetSocketAddress localAddress, InetSocketAddress remoteAddress) { this.sessionId = sessionId; this.securityCtx = securityCtx; this.localAddress = localAddress; this.remoteAddress = remoteAddress; + this.sessionSubIdSeq = new AtomicInteger(); } @Override diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 8fb6f59fd1..0f0d550836 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -173,7 +173,6 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { } @Test - @Ignore public void testEntityDataLatestWsCmd() throws Exception { Device device = new Device(); device.setName("Device"); @@ -212,8 +211,6 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { sendTelemetry(device, tsData); cmd = new EntityDataCmd(1, edq, null, latestCmd, null); - - wrapper = new TelemetryPluginCmdsWrapper(); wrapper.setEntityDataCmds(Collections.singletonList(cmd)); @@ -231,11 +228,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature"); Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue); - log.error("GOING TO LISTEN FOR UPDATES"); - msg = wsClient.waitForUpdate(); now = System.currentTimeMillis(); TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L)); + + wsClient.registerWaitForUpdate(); sendTelemetry(device, Arrays.asList(dataPoint2)); + msg = wsClient.waitForUpdate(); update = mapper.readValue(msg, EntityDataUpdate.class); Assert.assertEquals(1, update.getCmdId()); @@ -247,6 +245,17 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature"); Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsValue); + //Sending update from the past, while latest value has new timestamp; + wsClient.registerWaitForUpdate(); + sendTelemetry(device, Arrays.asList(dataPoint1)); + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1)); + Assert.assertNull(msg); + + //Sending duplicate update again + wsClient.registerWaitForUpdate(); + sendTelemetry(device, Arrays.asList(dataPoint2)); + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1)); + Assert.assertNull(msg); } } diff --git a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java index cf1c31c851..10f72a406b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java +++ b/application/src/test/java/org/thingsboard/server/controller/TbTestWebSocketClient.java @@ -27,9 +27,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public class TbTestWebSocketClient extends WebSocketClient { - private volatile String lastReply; - private volatile String lastUpdate; - private volatile boolean replyReceived; + private volatile String lastMsg; private CountDownLatch reply; private CountDownLatch update; @@ -44,23 +42,13 @@ public class TbTestWebSocketClient extends WebSocketClient { @Override public void onMessage(String s) { - log.error("RECEIVED: {}", s); - synchronized (this) { - if (!replyReceived) { - replyReceived = true; - lastReply = s; - log.error("LAST REPLY: {}", s); - if (reply != null) { - reply.countDown(); - } - } else { - lastUpdate = s; - log.error("LAST UPDATE: {}", s); - if (update == null) { - update = new CountDownLatch(1); - } - update.countDown(); - } + log.info("RECEIVED: {}", s); + lastMsg = s; + if (reply != null) { + reply.countDown(); + } + if (update != null) { + update.countDown(); } } @@ -74,25 +62,28 @@ public class TbTestWebSocketClient extends WebSocketClient { } + public void registerWaitForUpdate() { + lastMsg = null; + update = new CountDownLatch(1); + } + @Override public void send(String text) throws NotYetConnectedException { - synchronized (this) { - reply = new CountDownLatch(1); - replyReceived = false; - } + reply = new CountDownLatch(1); super.send(text); } public String waitForUpdate() { - synchronized (this) { - update = new CountDownLatch(1); - } + return waitForUpdate(TimeUnit.SECONDS.toMillis(3)); + } + + public String waitForUpdate(long ms) { try { - update.await(3, TimeUnit.SECONDS); + update.await(ms, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.warn("Failed to await reply", e); } - return lastUpdate; + return lastMsg; } public String waitForReply() { @@ -101,6 +92,6 @@ public class TbTestWebSocketClient extends WebSocketClient { } catch (InterruptedException e) { log.warn("Failed to await reply", e); } - return lastReply; + return lastMsg; } } diff --git a/application/src/test/resources/logback.xml b/application/src/test/resources/logback.xml index 4f083906e5..f991a40078 100644 --- a/application/src/test/resources/logback.xml +++ b/application/src/test/resources/logback.xml @@ -7,6 +7,8 @@ + + diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java index 01c695ceaf..e0c1d12699 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataQuery.java @@ -17,9 +17,11 @@ package org.thingsboard.server.common.data.query; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Getter; +import lombok.ToString; import java.util.List; +@ToString public class EntityDataQuery extends EntityCountQuery { @Getter