From be608445eb9960c3e198596b3eeeeda837ee4857 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 4 Jul 2024 13:35:53 +0200 Subject: [PATCH] minor improvements due to comments --- .../queue/DefaultTbClusterService.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 354e8dce04..eabec663a3 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -359,7 +359,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) { if (resource.getResourceType() == ResourceType.LWM2M_MODEL) { - log.trace("[{}] Processing delete resource", resource); + log.trace("[{}][{}][{}] Processing delete resource", resource.getTenantId(), resource.getResourceType(), resource.getResourceKey()); TransportProtos.ResourceDeleteMsg resourceDeleteMsg = TransportProtos.ResourceDeleteMsg.newBuilder() .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) @@ -390,8 +390,19 @@ public class DefaultTbClusterService implements TbClusterService { } private void broadcast(ToTransportMsg transportMsg, TbQueueCallback callback) { - TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); Set tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); + broadcast(transportMsg, tbTransportServices, callback); + } + + private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) { + Set tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT).stream() + .filter(info -> info.getTransportsList().contains(transportType)) + .map(TransportProtos.ServiceInfo::getServiceId).collect(Collectors.toSet()); + broadcast(transportMsg, tbTransportServices, callback); + } + + private void broadcast(ToTransportMsg transportMsg, Set tbTransportServices, TbQueueCallback callback) { + TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportServices.size(), callback) : null; for (String transportServiceId : tbTransportServices) { TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); @@ -400,20 +411,6 @@ public class DefaultTbClusterService implements TbClusterService { } } - private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) { - TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); - Set tbTransportInfos = partitionService.getAllServices(ServiceType.TB_TRANSPORT); - TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportInfos.size(), callback) : null; - tbTransportInfos.stream() - .filter(info -> info.getTransportsList().contains(transportType)) - .map(TransportProtos.ServiceInfo::getServiceId) - .forEach(transportServiceId -> { - TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); - toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), proxyCallback); - toTransportNfs.incrementAndGet(); - }); - } - @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);