|
|
|
@ -25,7 +25,9 @@ import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.thingsboard.rule.engine.api.*; |
|
|
|
import org.thingsboard.rule.engine.api.util.DonAsynchron; |
|
|
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|
|
|
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.BaseTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.TsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.plugin.ComponentType; |
|
|
|
@ -81,7 +83,7 @@ public class TbGetTelemetryNode implements TbNode { |
|
|
|
ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); |
|
|
|
} else { |
|
|
|
try { |
|
|
|
List<TsKvQuery> queries = buildQueries(); |
|
|
|
List<ReadTsKvQuery> queries = buildQueries(); |
|
|
|
ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(msg.getOriginator(), queries); |
|
|
|
DonAsynchron.withCallback(list, data -> { |
|
|
|
process(data, msg); |
|
|
|
@ -95,13 +97,13 @@ public class TbGetTelemetryNode implements TbNode { |
|
|
|
} |
|
|
|
|
|
|
|
//TODO: handle direction;
|
|
|
|
private List<TsKvQuery> buildQueries() { |
|
|
|
private List<ReadTsKvQuery> buildQueries() { |
|
|
|
long ts = System.currentTimeMillis(); |
|
|
|
long startTs = ts - startTsOffset; |
|
|
|
long endTs = ts - endTsOffset; |
|
|
|
|
|
|
|
return tsKeyNames.stream() |
|
|
|
.map(key -> new BaseTsKvQuery(key, startTs, endTs, 1, limit, NONE)) |
|
|
|
.map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE)) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
} |
|
|
|
|
|
|
|
|