From 3b5a4941ff6161acfc2fba95fc6a91af059231cc Mon Sep 17 00:00:00 2001 From: vparomskiy Date: Fri, 2 Mar 2018 13:27:59 +0200 Subject: [PATCH] Cassandra MsqQueue initial implementation --- .../src/main/resources/thingsboard.yml | 3 + dao/src/main/resources/cassandra/schema.cql | 65 ++++----- .../rule/engine/api/TbMsgMetaData.java | 3 +- rule-engine/rule-engine-components/pom.xml | 102 +++++++++++++ .../queue/cassandra/CassandraMsqQueue.java | 53 ++++--- .../rule/engine/queue/cassandra/MsgAck.java | 6 +- .../queue/cassandra/QueuePartitioner.java | 83 +++++++++++ .../queue/cassandra/UnprocessedMsgFilter.java | 12 +- .../cassandra/repository/AckRepository.java | 3 +- .../cassandra/repository/MsgRepository.java | 5 +- .../ProcessedPartitionRepository.java | 19 ++- .../impl/CassandraAckRepository.java | 64 +++++++++ .../impl/CassandraMsgRepository.java | 109 ++++++++++++++ ...CassandraProcessedPartitionRepository.java | 60 ++++++++ .../impl/SimpleAbstractCassandraDao.java | 77 ++++++++++ .../rule/engine/tool/QueueBenchmark.java | 136 ++++++++++++++++++ .../AckBuilder.java => proto/msgqueue.proto} | 25 ++-- .../cassandra/CassandraMsqQueueTest.java | 48 +++++++ .../queue/cassandra/QueuePartitionerTest.java | 81 +++++++++++ .../cassandra/UnprocessedMsgFilterTest.java | 45 ++++++ .../impl/CassandraAckRepositoryTest.java | 81 +++++++++++ .../impl/CassandraMsgRepositoryTest.java | 82 +++++++++++ ...andraProcessedPartitionRepositoryTest.java | 80 +++++++++++ .../impl/SimpleAbstractCassandraDaoTest.java | 30 ++++ .../test/resources/cassandra/system-test.cql | 75 ++++++++++ 25 files changed, 1267 insertions(+), 80 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java rename rule-engine/rule-engine-components/src/main/{java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java => proto/msgqueue.proto} (59%) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDaoTest.java create mode 100644 rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6bb3917652..7eef95aad5 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -251,6 +251,9 @@ spring: username: "${SPRING_DATASOURCE_USERNAME:sa}" password: "${SPRING_DATASOURCE_PASSWORD:}" +rule: + queue: + msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}" # PostgreSQL DAO Configuration #spring: diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql index 876c9f74da..05e6cfed62 100644 --- a/dao/src/main/resources/cassandra/schema.cql +++ b/dao/src/main/resources/cassandra/schema.cql @@ -555,48 +555,45 @@ CREATE TABLE IF NOT EXISTS thingsboard.msg_queue ( partition bigint, ts bigint, msg blob, - PRIMARY KEY ((node_id, cluster_hash, partition), ts) - WITH CLUSTERING ORDER BY (ts DESC) - AND compaction = { - 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', - 'min_threshold': '5', - 'base_time_seconds': '43200', - 'max_window_size_seconds': '43200' - 'tombstone_threshold': '0.9', - 'unchecked_tombstone_compaction': 'true', - }; -); + PRIMARY KEY ((node_id, clustered_hash, partition), ts)) +WITH CLUSTERING ORDER BY (ts DESC) +AND compaction = { + 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', + 'min_threshold': '5', + 'base_time_seconds': '43200', + 'max_window_size_seconds': '43200', + 'tombstone_threshold': '0.9', + 'unchecked_tombstone_compaction': 'true' +}; + CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue ( node_id timeuuid, clustered_hash bigint, partition bigint, - ts bigint, msg_id timeuuid, - PRIMARY KEY ((node_id, cluster_hash, partition), ts) - WITH CLUSTERING ORDER BY (ts DESC) - AND compaction = { - 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', - 'min_threshold': '5', - 'base_time_seconds': '43200', - 'max_window_size_seconds': '43200' - 'tombstone_threshold': '0.9', - 'unchecked_tombstone_compaction': 'true', - }; -); + PRIMARY KEY ((node_id, clustered_hash, partition), msg_id)) +WITH CLUSTERING ORDER BY (msg_id DESC) +AND compaction = { + 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', + 'min_threshold': '5', + 'base_time_seconds': '43200', + 'max_window_size_seconds': '43200', + 'tombstone_threshold': '0.9', + 'unchecked_tombstone_compaction': 'true' +}; CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions ( node_id timeuuid, clustered_hash bigint, partition bigint, - PRIMARY KEY ((node_id, cluster_hash), partition) - WITH CLUSTERING ORDER BY (partition DESC) - AND compaction = { - 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', - 'min_threshold': '5', - 'base_time_seconds': '43200', - 'max_window_size_seconds': '43200' - 'tombstone_threshold': '0.9', - 'unchecked_tombstone_compaction': 'true', - }; -); \ No newline at end of file + PRIMARY KEY ((node_id, clustered_hash), partition)) +WITH CLUSTERING ORDER BY (partition DESC) +AND compaction = { + 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', + 'min_threshold': '5', + 'base_time_seconds': '43200', + 'max_window_size_seconds': '43200', + 'tombstone_threshold': '0.9', + 'unchecked_tombstone_compaction': 'true' +}; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java index 92d3b8e099..8dd4840a84 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java @@ -19,6 +19,7 @@ import lombok.Data; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Created by ashvayka on 13.01.18. @@ -26,7 +27,7 @@ import java.util.Map; @Data public final class TbMsgMetaData implements Serializable { - private Map data; + private Map data = new ConcurrentHashMap<>(); public String getValue(String key) { return data.get(key); diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index ee0d83f9a7..5ee1d94358 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -63,5 +63,107 @@ rule-engine-api 1.4.0-SNAPSHOT + + com.google.protobuf + protobuf-java + provided + + + com.google.guava + guava + + + com.datastax.cassandra + cassandra-driver-core + + + com.datastax.cassandra + cassandra-driver-mapping + + + com.datastax.cassandra + cassandra-driver-extras + + + + junit + junit + ${junit.version} + test + + + org.cassandraunit + cassandra-unit + + + org.slf4j + slf4j-log4j12 + + + test + + + org.mockito + mockito-all + test + + + org.junit.jupiter + junit-jupiter-api + RELEASE + + + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + org.thingsboard.rule.engine.tool.QueueBenchmark + boot + ZIP + true + true + + + + + + + + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java index 38ae62788b..4839761c67 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueue.java @@ -15,68 +15,65 @@ */ package org.thingsboard.rule.engine.queue.cassandra; +import com.datastax.driver.core.utils.UUIDs; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.MsqQueue; import org.thingsboard.rule.engine.api.TbMsg; import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository; import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository; -import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository; +import org.thingsboard.server.common.data.UUIDConverter; -import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.UUID; @Component +@Slf4j public class CassandraMsqQueue implements MsqQueue { - @Autowired - private MsgRepository msgRepository; + private final MsgRepository msgRepository; + private final AckRepository ackRepository; + private final UnprocessedMsgFilter unprocessedMsgFilter; + private final QueuePartitioner queuePartitioner; - @Autowired - private AckRepository ackRepository; - - @Autowired - private AckBuilder ackBuilder; - - @Autowired - private UnprocessedMsgFilter unprocessedMsgFilter; + public CassandraMsqQueue(MsgRepository msgRepository, AckRepository ackRepository, + UnprocessedMsgFilter unprocessedMsgFilter, QueuePartitioner queuePartitioner) { + this.msgRepository = msgRepository; + this.ackRepository = ackRepository; + this.unprocessedMsgFilter = unprocessedMsgFilter; + this.queuePartitioner = queuePartitioner; + } - @Autowired - private ProcessedPartitionRepository processedPartitionRepository; @Override public ListenableFuture put(TbMsg msg, UUID nodeId, long clusteredHash) { - return msgRepository.save(msg, nodeId, clusteredHash, getPartition(msg)); + long msgTime = getMsgTime(msg); + long partition = queuePartitioner.getPartition(msgTime); + return msgRepository.save(msg, nodeId, clusteredHash, partition, msgTime); } @Override public ListenableFuture ack(TbMsg msg, UUID nodeId, long clusteredHash) { - MsgAck ack = ackBuilder.build(msg, nodeId, clusteredHash); + long partition = queuePartitioner.getPartition(getMsgTime(msg)); + MsgAck ack = new MsgAck(msg.getId(), nodeId, clusteredHash, partition); return ackRepository.ack(ack); } @Override public Iterable findUnprocessed(UUID nodeId, long clusteredHash) { List unprocessedMsgs = Lists.newArrayList(); - for (Long partition : findUnprocessedPartitions(nodeId, clusteredHash)) { - Iterable msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition); - Iterable acks = ackRepository.findAcks(nodeId, clusteredHash, partition); + for (Long partition : queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash)) { + List msgs = msgRepository.findMsgs(nodeId, clusteredHash, partition); + List acks = ackRepository.findAcks(nodeId, clusteredHash, partition); unprocessedMsgs.addAll(unprocessedMsgFilter.filter(msgs, acks)); } return unprocessedMsgs; } - private List findUnprocessedPartitions(UUID nodeId, long clusteredHash) { - Optional lastPartition = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash); - return Collections.emptyList(); - } - - private long getPartition(TbMsg msg) { - return Long.MIN_VALUE; + private long getMsgTime(TbMsg msg) { + return UUIDs.unixTimestamp(msg.getId()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java index ca8d820ef7..c6885ad41b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/MsgAck.java @@ -15,17 +15,21 @@ */ package org.thingsboard.rule.engine.queue.cassandra; +import com.datastax.driver.core.utils.UUIDs; import lombok.Data; +import lombok.EqualsAndHashCode; +import org.thingsboard.rule.engine.api.TbMsg; +import org.thingsboard.server.common.data.UUIDConverter; import java.util.UUID; @Data +@EqualsAndHashCode public class MsgAck { private final UUID msgId; private final UUID nodeId; private final long clusteredHash; private final long partition; - private final long ts; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java new file mode 100644 index 0000000000..9a7886f54c --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitioner.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository; +import org.thingsboard.server.dao.timeseries.TsPartitionDate; + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +@Component +@Slf4j +public class QueuePartitioner { + + private ProcessedPartitionRepository processedPartitionRepository; + + private final TsPartitionDate tsFormat; + private Clock clock = Clock.systemUTC(); + + public QueuePartitioner(@Value("${rule.queue.msg_partitioning}") String partitioning, + ProcessedPartitionRepository processedPartitionRepository) { + this.processedPartitionRepository = processedPartitionRepository; + Optional partition = TsPartitionDate.parse(partitioning); + if (partition.isPresent()) { + tsFormat = partition.get(); + } else { + log.warn("Incorrect configuration of partitioning {}", "MINUTES"); + throw new RuntimeException("Failed to parse partitioning property: " + "MINUTES" + "!"); + } + } + + public long getPartition(long ts) { + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + public List findUnprocessedPartitions(UUID nodeId, long clusteredHash) { + Optional lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash); + long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100); + List unprocessedPartitions = Lists.newArrayList(); + + LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC); + LocalDateTime end = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC) + .plus(1L, tsFormat.getTruncateUnit()); + + while (current.isBefore(end)) { + current = current.plus(1L, tsFormat.getTruncateUnit()); + unprocessedPartitions.add(tsFormat.truncatedTo(current).toInstant(ZoneOffset.UTC).toEpochMilli()); + } + + return unprocessedPartitions; + } + + public void setClock(Clock clock) { + this.clock = clock; + } + + public void checkProcessedPartitions() { + //todo-vp: we need to implement this + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java index 99e9a92ba4..e114a8507d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilter.java @@ -15,14 +15,20 @@ */ package org.thingsboard.rule.engine.queue.cassandra; +import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.TbMsg; import java.util.Collection; -import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +@Component public class UnprocessedMsgFilter { - public Collection filter(Iterable msgs, Iterable acks) { - return Collections.emptyList(); + public Collection filter(List msgs, List acks) { + Set processedIds = acks.stream().map(MsgAck::getMsgId).collect(Collectors.toSet()); + return msgs.stream().filter(i -> !processedIds.contains(i.getId())).collect(Collectors.toList()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java index 3d9b55fa28..40c0416957 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/AckRepository.java @@ -18,11 +18,12 @@ package org.thingsboard.rule.engine.queue.cassandra.repository; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.queue.cassandra.MsgAck; +import java.util.List; import java.util.UUID; public interface AckRepository { ListenableFuture ack(MsgAck msgAck); - Iterable findAcks(UUID nodeId, long clusteredHash, long partition); + List findAcks(UUID nodeId, long clusteredHash, long partition); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java index 57e501eba9..5d34d84c3b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/MsgRepository.java @@ -18,12 +18,13 @@ package org.thingsboard.rule.engine.queue.cassandra.repository; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.TbMsg; +import java.util.List; import java.util.UUID; public interface MsgRepository { - ListenableFuture save(TbMsg msg, UUID nodeId, long clusteredHash, long partition); + ListenableFuture save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs); - Iterable findMsgs(UUID nodeId, long clusteredHash, long partition); + List findMsgs(UUID nodeId, long clusteredHash, long partition); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java index bc29050dad..807c0017c6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/ProcessedPartitionRepository.java @@ -1,11 +1,28 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.rule.engine.queue.cassandra.repository; +import com.google.common.util.concurrent.ListenableFuture; + import java.util.Optional; import java.util.UUID; public interface ProcessedPartitionRepository { - void partitionProcessed(UUID nodeId, long clusteredHash, long partition); + ListenableFuture partitionProcessed(UUID nodeId, long clusteredHash, long partition); Optional findLastProcessedPartition(UUID nodeId, long clusteredHash); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java new file mode 100644 index 0000000000..57dc79c49b --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepository.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import com.datastax.driver.core.*; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.queue.cassandra.MsgAck; +import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Component +public class CassandraAckRepository extends SimpleAbstractCassandraDao implements AckRepository { + + private final int ackQueueTtl; + + public CassandraAckRepository(Session session, int ackQueueTtl) { + super(session); + this.ackQueueTtl = ackQueueTtl; + } + + @Override + public ListenableFuture ack(MsgAck msgAck) { + String insert = "INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) VALUES (?, ?, ?, ?) USING TTL ?"; + PreparedStatement statement = prepare(insert); + BoundStatement boundStatement = statement.bind(msgAck.getNodeId(), msgAck.getClusteredHash(), + msgAck.getPartition(), msgAck.getMsgId(), ackQueueTtl); + ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement); + return Futures.transform(resultSetFuture, (Function) input -> null); + } + + @Override + public List findAcks(UUID nodeId, long clusteredHash, long partition) { + String select = "SELECT msg_id FROM msg_ack_queue WHERE " + + "node_id = ? AND clustered_hash = ? AND partition = ?"; + PreparedStatement statement = prepare(select); + BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition); + ResultSet rows = executeRead(boundStatement); + List msgs = new ArrayList<>(); + for (Row row : rows) { + msgs.add(new MsgAck(row.getUUID("msg_id"), nodeId, clusteredHash, partition)); + } + return msgs; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java new file mode 100644 index 0000000000..bd3ef21c99 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepository.java @@ -0,0 +1,109 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import com.datastax.driver.core.*; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.api.TbMsg; +import org.thingsboard.rule.engine.api.TbMsgMetaData; +import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository; +import org.thingsboard.rule.engine.queue.cassandra.repository.gen.MsgQueueProtos; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Component +public class CassandraMsgRepository extends SimpleAbstractCassandraDao implements MsgRepository { + + private final int msqQueueTtl; + + + public CassandraMsgRepository(Session session, int msqQueueTtl) { + super(session); + this.msqQueueTtl = msqQueueTtl; + } + + @Override + public ListenableFuture save(TbMsg msg, UUID nodeId, long clusteredHash, long partition, long msgTs) { + String insert = "INSERT INTO msg_queue (node_id, clustered_hash, partition, ts, msg) VALUES (?, ?, ?, ?, ?) USING TTL ?"; + PreparedStatement statement = prepare(insert); + BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition, msgTs, toBytes(msg), msqQueueTtl); + ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement); + return Futures.transform(resultSetFuture, (Function) input -> null); + } + + @Override + public List findMsgs(UUID nodeId, long clusteredHash, long partition) { + String select = "SELECT node_id, clustered_hash, partition, ts, msg FROM msg_queue WHERE " + + "node_id = ? AND clustered_hash = ? AND partition = ?"; + PreparedStatement statement = prepare(select); + BoundStatement boundStatement = statement.bind(nodeId, clusteredHash, partition); + ResultSet rows = executeRead(boundStatement); + List msgs = new ArrayList<>(); + for (Row row : rows) { + msgs.add(fromBytes(row.getBytes("msg"))); + } + return msgs; + } + + private ByteBuffer toBytes(TbMsg msg) { + MsgQueueProtos.TbMsgProto.Builder builder = MsgQueueProtos.TbMsgProto.newBuilder(); + builder.setId(msg.getId().toString()); + builder.setType(msg.getType()); + if (msg.getOriginator() != null) { + builder.setEntityType(msg.getOriginator().getEntityType().name()); + builder.setEntityId(msg.getOriginator().getId().toString()); + } + + if (msg.getMetaData() != null) { + MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgQueueProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder(); + metadataBuilder.putAllData(msg.getMetaData().getData()); + builder.addMetaData(metadataBuilder.build()); + } + + builder.setData(ByteString.copyFrom(msg.getData())); + byte[] bytes = builder.build().toByteArray(); + return ByteBuffer.wrap(bytes); + } + + private TbMsg fromBytes(ByteBuffer buffer) { + try { + MsgQueueProtos.TbMsgProto proto = MsgQueueProtos.TbMsgProto.parseFrom(buffer.array()); + TbMsgMetaData metaData = new TbMsgMetaData(); + if (proto.getMetaDataCount() > 0) { + metaData.setData(proto.getMetaData(0).getDataMap()); + } + + EntityId entityId = null; + if (proto.getEntityId() != null) { + entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()); + } + + return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray()); + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException("Could not parse protobuf for TbMsg", e); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java new file mode 100644 index 0000000000..7fc15d8207 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepository.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import com.datastax.driver.core.*; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository; + +import java.util.Optional; +import java.util.UUID; + +@Component +public class CassandraProcessedPartitionRepository extends SimpleAbstractCassandraDao implements ProcessedPartitionRepository { + + private final int repositoryTtl; + + public CassandraProcessedPartitionRepository(Session session, int repositoryTtl) { + super(session); + this.repositoryTtl = repositoryTtl; + } + + @Override + public ListenableFuture partitionProcessed(UUID nodeId, long clusteredHash, long partition) { + String insert = "INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) VALUES (?, ?, ?) USING TTL ?"; + PreparedStatement prepared = prepare(insert); + BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash, partition, repositoryTtl); + ResultSetFuture resultSetFuture = executeAsyncWrite(boundStatement); + return Futures.transform(resultSetFuture, (Function) input -> null); + } + + @Override + public Optional findLastProcessedPartition(UUID nodeId, long clusteredHash) { + String select = "SELECT partition FROM processed_msg_partitions WHERE " + + "node_id = ? AND clustered_hash = ?"; + PreparedStatement prepared = prepare(select); + BoundStatement boundStatement = prepared.bind(nodeId, clusteredHash); + Row row = executeRead(boundStatement).one(); + if (row == null) { + return Optional.empty(); + } + + return Optional.of(row.getLong("partition")); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java new file mode 100644 index 0000000000..8f01c188ea --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDao.java @@ -0,0 +1,77 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import com.datastax.driver.core.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +@Slf4j +public abstract class SimpleAbstractCassandraDao { + + private ConsistencyLevel defaultReadLevel = ConsistencyLevel.QUORUM; + private ConsistencyLevel defaultWriteLevel = ConsistencyLevel.QUORUM; + private Session session; + private Map preparedStatementMap = new ConcurrentHashMap<>(); + + public SimpleAbstractCassandraDao(Session session) { + this.session = session; + } + + protected Session getSession() { + return session; + } + + protected ResultSet executeRead(Statement statement) { + return execute(statement, defaultReadLevel); + } + + protected ResultSet executeWrite(Statement statement) { + return execute(statement, defaultWriteLevel); + } + + protected ResultSetFuture executeAsyncRead(Statement statement) { + return executeAsync(statement, defaultReadLevel); + } + + protected ResultSetFuture executeAsyncWrite(Statement statement) { + return executeAsync(statement, defaultWriteLevel); + } + + protected PreparedStatement prepare(String query) { + return preparedStatementMap.computeIfAbsent(query, i -> getSession().prepare(i)); + } + + private ResultSet execute(Statement statement, ConsistencyLevel level) { + log.debug("Execute cassandra statement {}", statement); + if (statement.getConsistencyLevel() == null) { + statement.setConsistencyLevel(level); + } + return getSession().execute(statement); + } + + private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) { + log.debug("Execute cassandra async statement {}", statement); + if (statement.getConsistencyLevel() == null) { + statement.setConsistencyLevel(level); + } + return getSession().executeAsync(statement); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java new file mode 100644 index 0000000000..58510d721c --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/tool/QueueBenchmark.java @@ -0,0 +1,136 @@ +package org.thingsboard.rule.engine.tool; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.thingsboard.rule.engine.api.MsqQueue; +import org.thingsboard.rule.engine.api.TbMsg; +import org.thingsboard.rule.engine.api.TbMsgMetaData; + +import javax.annotation.Nullable; +import java.net.InetSocketAddress; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@SpringBootApplication +@EnableAutoConfiguration +@ComponentScan({"org.thingsboard.rule.engine"}) +//@PropertySource("classpath:processing-pipeline.properties") +@Slf4j +public class QueueBenchmark implements CommandLineRunner { + + public static void main(String[] args) { + try { + SpringApplication.run(QueueBenchmark.class, args); + } catch (Throwable th) { + th.printStackTrace(); + System.exit(0); + } + } + + @Autowired + private MsqQueue msqQueue; + + @Override + public void run(String... strings) throws Exception { + System.out.println("It works + " + msqQueue); + + + long start = System.currentTimeMillis(); + int msgCount = 10000000; + AtomicLong count = new AtomicLong(0); + ExecutorService service = Executors.newFixedThreadPool(100); + + CountDownLatch latch = new CountDownLatch(msgCount); + for (int i = 0; i < msgCount; i++) { + service.submit(() -> { + boolean isFinished = false; + while (!isFinished) { + try { + TbMsg msg = randomMsg(); + UUID nodeId = UUIDs.timeBased(); + ListenableFuture put = msqQueue.put(msg, nodeId, 100L); +// ListenableFuture put = msqQueue.ack(msg, nodeId, 100L); + Futures.addCallback(put, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { +// t.printStackTrace(); + System.out.println("onFailure, because:" + t.getMessage()); + latch.countDown(); + } + }); + isFinished = true; + } catch (Throwable th) { +// th.printStackTrace(); + System.out.println("Repeat query, because:" + th.getMessage()); +// latch.countDown(); + } + } + }); + } + + long prev = 0L; + while (latch.getCount() != 0) { + TimeUnit.SECONDS.sleep(1); + long curr = latch.getCount(); + long rps = prev - curr; + prev = curr; + System.out.println("rps = " + rps); + } + + long end = System.currentTimeMillis(); + System.out.println("final rps = " + (msgCount / (end - start) * 1000)); + + System.out.println("Finished"); + + } + + private TbMsg randomMsg() { + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("key", "value"); + String dataStr = "someContent"; + return new TbMsg(UUIDs.timeBased(), "type", null, metaData, dataStr.getBytes()); + } + + @Bean + public Session session() { + Cluster thingsboard = Cluster.builder() + .addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042)) + .withClusterName("thingsboard") +// .withSocketOptions(socketOpts.getOpts()) + .withPoolingOptions(new PoolingOptions() + .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) + .setMaxRequestsPerConnection(HostDistance.REMOTE, 32768)).build(); + + Session session = thingsboard.connect("thingsboard"); + return session; + } + + @Bean + public int defaultTtl() { + return 6000; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java b/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto similarity index 59% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java rename to rule-engine/rule-engine-components/src/main/proto/msgqueue.proto index 4f1e187c97..1105aeb664 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/queue/cassandra/AckBuilder.java +++ b/rule-engine/rule-engine-components/src/main/proto/msgqueue.proto @@ -13,17 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.rule.engine.queue.cassandra; +syntax = "proto3"; +package msgqueue; -import org.springframework.stereotype.Component; -import org.thingsboard.rule.engine.api.TbMsg; +option java_package = "org.thingsboard.rule.engine.queue.cassandra.repository.gen"; +option java_outer_classname = "MsgQueueProtos"; -import java.util.UUID; -@Component -public class AckBuilder { +message TbMsgProto { + string id = 1; + string type = 2; + string entityType = 3; + string entityId = 4; - public MsgAck build(TbMsg msg, UUID nodeId, long clusteredHash) { - return null; + message TbMsgMetaDataProto { + map data = 1; } -} + + repeated TbMsgMetaDataProto metaData = 5; + + bytes data = 6; +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java new file mode 100644 index 0000000000..8ce3355b09 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/CassandraMsqQueueTest.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.thingsboard.rule.engine.queue.cassandra.repository.AckRepository; +import org.thingsboard.rule.engine.queue.cassandra.repository.MsgRepository; + +public class CassandraMsqQueueTest { + + private CassandraMsqQueue msqQueue; + + @Mock + private MsgRepository msgRepository; + @Mock + private AckRepository ackRepository; + @Mock + private UnprocessedMsgFilter unprocessedMsgFilter; + @Mock + private QueuePartitioner queuePartitioner; + + @Before + public void init() { + msqQueue = new CassandraMsqQueue(msgRepository, ackRepository, unprocessedMsgFilter, queuePartitioner); + } + + @Test + public void msgCanBeSaved() { +// todo-vp: implement + } + + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java new file mode 100644 index 0000000000..a71a737f74 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/QueuePartitionerTest.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra; + + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.thingsboard.rule.engine.queue.cassandra.repository.ProcessedPartitionRepository; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class QueuePartitionerTest { + + private QueuePartitioner queuePartitioner; + + @Mock + private ProcessedPartitionRepository partitionRepo; + + private Instant startInstant; + private Instant endInstant; + + @Before + public void init() { + queuePartitioner = new QueuePartitioner("MINUTES", partitionRepo); + startInstant = Instant.now(); + endInstant = startInstant.plus(2, ChronoUnit.MINUTES); + queuePartitioner.setClock(Clock.fixed(endInstant, ZoneOffset.UTC)); + } + + @Test + public void partitionCalculated() { + long time = 1519390191425L; + long partition = queuePartitioner.getPartition(time); + assertEquals(1519390140000L, partition); + } + + @Test + public void unprocessedPartitionsReturned() { + UUID nodeId = UUID.randomUUID(); + long clusteredHash = 101L; + when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.of(startInstant.toEpochMilli())); + List actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash); + assertEquals(3, actual.size()); + } + + @Test + public void defaultShiftUsedIfNoPartitionWasProcessed() { + UUID nodeId = UUID.randomUUID(); + long clusteredHash = 101L; + when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty()); + List actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash); + assertEquals(1011, actual.size()); + } + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java new file mode 100644 index 0000000000..dec8e35b2b --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/UnprocessedMsgFilterTest.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra; + +import com.google.common.collect.Lists; +import org.junit.Test; +import org.thingsboard.rule.engine.api.TbMsg; + +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class UnprocessedMsgFilterTest { + + private UnprocessedMsgFilter msgFilter = new UnprocessedMsgFilter(); + + @Test + public void acknowledgedMsgsAreFilteredOut() { + UUID id1 = UUID.randomUUID(); + UUID id2 = UUID.randomUUID(); + TbMsg msg1 = new TbMsg(id1, "T", null, null, null); + TbMsg msg2 = new TbMsg(id2, "T", null, null, null); + List msgs = Lists.newArrayList(msg1, msg2); + List acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L)); + Collection actual = msgFilter.filter(msgs, acks); + assertEquals(1, actual.size()); + assertEquals(msg1, actual.iterator().next()); + } + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java new file mode 100644 index 0000000000..38b7b9de8f --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.rule.engine.queue.cassandra.MsgAck; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CassandraAckRepositoryTest extends SimpleAbstractCassandraDaoTest { + + private CassandraAckRepository ackRepository; + + @Before + public void init() { + ackRepository = new CassandraAckRepository(cassandraUnit.session, 1); + } + + @Test + public void acksInPartitionCouldBeFound() { + UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9"); + + List extectedAcks = Lists.newArrayList( + new MsgAck(UUID.fromString("bebaeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L), + new MsgAck(UUID.fromString("12baeb60-1888-11e8-bf21-65b5d5335ba9"), nodeId, 101L, 300L) + ); + + List actualAcks = ackRepository.findAcks(nodeId, 101L, 300L); + assertEquals(extectedAcks, actualAcks); + } + + @Test + public void ackCanBeSavedAndRead() throws ExecutionException, InterruptedException { + UUID msgId = UUIDs.timeBased(); + UUID nodeId = UUIDs.timeBased(); + MsgAck ack = new MsgAck(msgId, nodeId, 10L, 20L); + ListenableFuture future = ackRepository.ack(ack); + future.get(); + List actualAcks = ackRepository.findAcks(nodeId, 10L, 20L); + assertEquals(1, actualAcks.size()); + assertEquals(ack, actualAcks.get(0)); + } + + @Test + public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException { + UUID msgId = UUIDs.timeBased(); + UUID nodeId = UUIDs.timeBased(); + MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L); + ListenableFuture future = ackRepository.ack(ack); + future.get(); + List actualAcks = ackRepository.findAcks(nodeId, 30L, 40L); + assertEquals(1, actualAcks.size()); + TimeUnit.SECONDS.sleep(2); + assertTrue(ackRepository.findAcks(nodeId, 30L, 40L).isEmpty()); + } + + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java new file mode 100644 index 0000000000..a0827fc5ac --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java @@ -0,0 +1,82 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +//import static org.junit.jupiter.api.Assertions.*; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.rule.engine.api.TbMsg; +import org.thingsboard.rule.engine.api.TbMsgMetaData; +import org.thingsboard.server.common.data.id.DeviceId; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CassandraMsgRepositoryTest extends SimpleAbstractCassandraDaoTest { + + private CassandraMsgRepository msgRepository; + + @Before + public void init() { + msgRepository = new CassandraMsgRepository(cassandraUnit.session, 1); + } + + @Test + public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException { + TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]); + UUID nodeId = UUIDs.timeBased(); + ListenableFuture future = msgRepository.save(msg, nodeId, 1L, 1L, 1L); + future.get(); + List msgs = msgRepository.findMsgs(nodeId, 1L, 1L); + assertEquals(1, msgs.size()); + } + + @Test + public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException { + TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]); + UUID nodeId = UUIDs.timeBased(); + ListenableFuture future = msgRepository.save(msg, nodeId, 2L, 2L, 2L); + future.get(); + List msgs = msgRepository.findMsgs(nodeId, 2L, 2L); + assertEquals(1, msgs.size()); + TimeUnit.SECONDS.sleep(2); + assertTrue(msgRepository.findMsgs(nodeId, 2L, 2L).isEmpty()); + } + + @Test + public void protoBufConverterWorkAsExpected() throws ExecutionException, InterruptedException { + TbMsgMetaData metaData = new TbMsgMetaData(); + metaData.putValue("key", "value"); + String dataStr = "someContent"; + TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, dataStr.getBytes()); + UUID nodeId = UUIDs.timeBased(); + ListenableFuture future = msgRepository.save(msg, nodeId, 1L, 1L, 1L); + future.get(); + List msgs = msgRepository.findMsgs(nodeId, 1L, 1L); + assertEquals(1, msgs.size()); + assertEquals(msg, msgs.get(0)); + } + + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java new file mode 100644 index 0000000000..a731452f24 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java @@ -0,0 +1,80 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class CassandraProcessedPartitionRepositoryTest extends SimpleAbstractCassandraDaoTest { + + private CassandraProcessedPartitionRepository partitionRepository; + + @Before + public void init() { + partitionRepository = new CassandraProcessedPartitionRepository(cassandraUnit.session, 1); + } + + @Test + public void lastProcessedPartitionCouldBeFound() { + UUID nodeId = UUID.fromString("055eee50-1883-11e8-b380-65b5d5335ba9"); + Optional lastProcessedPartition = partitionRepository.findLastProcessedPartition(nodeId, 101L); + assertTrue(lastProcessedPartition.isPresent()); + assertEquals((Long) 777L, lastProcessedPartition.get()); + } + + @Test + public void highestProcessedPartitionReturned() throws ExecutionException, InterruptedException { + UUID nodeId = UUIDs.timeBased(); + ListenableFuture future1 = partitionRepository.partitionProcessed(nodeId, 303L, 100L); + ListenableFuture future2 = partitionRepository.partitionProcessed(nodeId, 303L, 200L); + ListenableFuture future3 = partitionRepository.partitionProcessed(nodeId, 303L, 10L); + ListenableFuture> allFutures = Futures.allAsList(future1, future2, future3); + allFutures.get(); + Optional actual = partitionRepository.findLastProcessedPartition(nodeId, 303L); + assertTrue(actual.isPresent()); + assertEquals((Long) 200L, actual.get()); + } + + @Test + public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException { + UUID nodeId = UUIDs.timeBased(); + ListenableFuture future = partitionRepository.partitionProcessed(nodeId, 404L, 10L); + future.get(); + Optional actual = partitionRepository.findLastProcessedPartition(nodeId, 404L); + assertEquals((Long) 10L, actual.get()); + TimeUnit.SECONDS.sleep(2); + assertFalse(partitionRepository.findLastProcessedPartition(nodeId, 404L).isPresent()); + } + + @Test + public void ifNoPartitionsWereProcessedEmptyResultReturned() { + UUID nodeId = UUIDs.timeBased(); + Optional actual = partitionRepository.findLastProcessedPartition(nodeId, 505L); + assertFalse(actual.isPresent()); + } + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDaoTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDaoTest.java new file mode 100644 index 0000000000..1f248e8c90 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/queue/cassandra/repository/impl/SimpleAbstractCassandraDaoTest.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.queue.cassandra.repository.impl; + +import org.cassandraunit.CassandraCQLUnit; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.ClassRule; + + +public abstract class SimpleAbstractCassandraDaoTest { + + @ClassRule + public static CassandraCQLUnit cassandraUnit = new CassandraCQLUnit( + new ClassPathCQLDataSet("cassandra/system-test.cql", "thingsboard")); + + +} \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql b/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql new file mode 100644 index 0000000000..7ba9f26d79 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/resources/cassandra/system-test.cql @@ -0,0 +1,75 @@ +CREATE TABLE IF NOT EXISTS thingsboard.msg_queue ( + node_id timeuuid, + clustered_hash bigint, + partition bigint, + ts bigint, + msg blob, + PRIMARY KEY ((node_id, clustered_hash, partition), ts)) +WITH CLUSTERING ORDER BY (ts DESC) +AND compaction = { + 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', + 'min_threshold': '5', + 'base_time_seconds': '43200', + 'max_window_size_seconds': '43200', + 'tombstone_threshold': '0.9', + 'unchecked_tombstone_compaction': 'true' +}; + + +CREATE TABLE IF NOT EXISTS thingsboard.msg_ack_queue ( + node_id timeuuid, + clustered_hash bigint, + partition bigint, + msg_id timeuuid, + PRIMARY KEY ((node_id, clustered_hash, partition), msg_id)) +WITH CLUSTERING ORDER BY (msg_id DESC) +AND compaction = { + 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', + 'min_threshold': '5', + 'base_time_seconds': '43200', + 'max_window_size_seconds': '43200', + 'tombstone_threshold': '0.9', + 'unchecked_tombstone_compaction': 'true' +}; + +CREATE TABLE IF NOT EXISTS thingsboard.processed_msg_partitions ( + node_id timeuuid, + clustered_hash bigint, + partition bigint, + PRIMARY KEY ((node_id, clustered_hash), partition)) +WITH CLUSTERING ORDER BY (partition DESC) +AND compaction = { + 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', + 'min_threshold': '5', + 'base_time_seconds': '43200', + 'max_window_size_seconds': '43200', + 'tombstone_threshold': '0.9', + 'unchecked_tombstone_compaction': 'true' +}; + + + +-- msg_queue dataset + +INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 201, null); +INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 202, null); +INSERT INTO thingsboard.msg_queue (node_id, clustered_hash, partition, ts, msg) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 301, null); + +-- ack_queue dataset +INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, bebaeb60-1888-11e8-bf21-65b5d5335ba9); +INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 300, 12baeb60-1888-11e8-bf21-65b5d5335ba9); + INSERT INTO msg_ack_queue (node_id, clustered_hash, partition, msg_id) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 200, 32baeb60-1888-11e8-bf21-65b5d5335ba9); + +-- processed partition dataset +INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 100); +INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 101, 777); +INSERT INTO processed_msg_partitions (node_id, clustered_hash, partition) + VALUES (055eee50-1883-11e8-b380-65b5d5335ba9, 202, 200); \ No newline at end of file