diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java similarity index 77% rename from application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java rename to application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 9df6c16111..6815827353 100644 --- a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.cluster.routing; +package org.thingsboard.server.queue.discovery; import com.datastax.oss.driver.api.core.uuid.Uuids; import lombok.extern.slf4j.Slf4j; @@ -29,10 +29,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.discovery.HashPartitionService; -import org.thingsboard.server.queue.discovery.QueueRoutingInfoService; -import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; import java.util.ArrayList; import java.util.Collections; @@ -111,13 +107,45 @@ public class HashPartitionServiceTest { map.put(partition, map.getOrDefault(partition, 0) + 1); } - List> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); + printDispersion(start, map); + } + + @Test + public void testDispersionOnResolveByPartitionIdx() { + int serverCount = 10; + int queueCount = 1000; + int partitionCount = 3; + + List services = new ArrayList<>(); + + for (int i = 0; i < serverCount; i++) { + services.add(TransportProtos.ServiceInfo.newBuilder().setServiceId("RE-" + i).build()); + } + + long start = System.currentTimeMillis(); + Map map = new HashMap<>(); + services.forEach(s -> map.put(s.getServiceId(), 0)); + + for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) { + for (int partition = 0; partition < partitionCount; partition++) { + TopicPartitionInfo tpi = new TopicPartitionInfo("tb_rule_engine.queue_" + queueIndex, TenantId.SYS_TENANT_ID, partition, false); + TransportProtos.ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, tpi); + String serviceId = serviceInfo.getServiceId(); + map.put(serviceId, map.get(serviceId) + 1); + } + } + + printDispersion(start, map); + } + + private void printDispersion(long start, Map map) { + List> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); long end = System.currentTimeMillis(); double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue()); double diffPercent = (diff / ITERATIONS) * 100.0; System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)"); Assert.assertTrue(diffPercent < 0.5); - for (Map.Entry entry : data) { + for (Map.Entry entry : data) { System.out.println(entry.getKey() + ": " + entry.getValue()); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index afbd96625d..941abcb5b0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -253,8 +253,12 @@ public class HashPartitionService implements PartitionService { ConcurrentMap> oldPartitions = myPartitions; myPartitions = new ConcurrentHashMap<>(); partitionSizesMap.forEach((queueKey, size) -> { + TenantId tenantId = queueKey.getTenantId(); + String topic = partitionTopicsMap.get(queueKey); + for (int i = 0; i < size; i++) { - ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), i); + TopicPartitionInfo tpi = new TopicPartitionInfo(topic, tenantId, i, false); + ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), tpi); if (currentService.equals(serviceInfo)) { myPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); } @@ -434,11 +438,14 @@ public class HashPartitionService implements PartitionService { } } - private ServiceInfo resolveByPartitionIdx(List servers, Integer partitionIdx) { + protected ServiceInfo resolveByPartitionIdx(List servers, TopicPartitionInfo tpi) { if (servers == null || servers.isEmpty()) { return null; } - return servers.get(partitionIdx % servers.size()); + + int hash = hashFunction.newHasher().putInt(tpi.hashCode()).hash().asInt(); + + return servers.get(Math.abs(hash % servers.size())); } public static HashFunction forName(String name) {