Browse Source

Edqs - VersionStore - Use local cache instead of caffeine to reduce memory heap size

pull/13506/head
Volodymyr Babak 8 months ago
parent
commit
46a58ca82b
  1. 1
      common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java
  2. 47
      common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java

1
common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java

@ -277,6 +277,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
eventConsumer.awaitStop();
responseTemplate.stop();
stateService.stop();
versionsStore.shutdown();
}
}

47
common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java

@ -15,31 +15,35 @@
*/
package org.thingsboard.server.edqs.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class VersionsStore {
private final Cache<EdqsObjectKey, Long> versions;
private final ConcurrentMap<EdqsObjectKey, TimedValue<Long>> versions = new ConcurrentHashMap<>();
private final long expirationMillis;
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
public VersionsStore(int ttlMinutes) {
this.versions = Caffeine.newBuilder()
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
.build();
this.expirationMillis = TimeUnit.MINUTES.toMillis(ttlMinutes);
startCleanupTask();
}
public boolean isNew(EdqsObjectKey key, Long version) {
AtomicBoolean isNew = new AtomicBoolean(false);
versions.asMap().compute(key, (k, prevVersion) -> {
if (prevVersion == null || prevVersion <= version) {
versions.compute(key, (k, prevVersion) -> {
if (prevVersion == null || prevVersion.value <= version) {
isNew.set(true);
return version;
return new TimedValue<>(version);
} else {
log.debug("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
return prevVersion;
@ -48,4 +52,29 @@ public class VersionsStore {
return isNew.get();
}
private void startCleanupTask() {
cleaner.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
for (Map.Entry<EdqsObjectKey, TimedValue<Long>> entry : versions.entrySet()) {
if (now - entry.getValue().lastUpdated > expirationMillis) {
versions.remove(entry.getKey(), entry.getValue());
}
}
}, expirationMillis, expirationMillis, TimeUnit.MILLISECONDS);
}
public void shutdown() {
cleaner.shutdown();
}
private static class TimedValue<V> {
private final long lastUpdated;
private final V value;
public TimedValue(V value) {
this.value = value;
this.lastUpdated = System.currentTimeMillis();
}
}
}

Loading…
Cancel
Save