Browse Source

minor improvements due to comments

pull/11151/head
YevhenBondarenko 2 years ago
parent
commit
be608445eb
  1. 29
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

29
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -359,7 +359,7 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) {
if (resource.getResourceType() == ResourceType.LWM2M_MODEL) {
log.trace("[{}] Processing delete resource", resource);
log.trace("[{}][{}][{}] Processing delete resource", resource.getTenantId(), resource.getResourceType(), resource.getResourceKey());
TransportProtos.ResourceDeleteMsg resourceDeleteMsg = TransportProtos.ResourceDeleteMsg.newBuilder()
.setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits())
@ -390,8 +390,19 @@ public class DefaultTbClusterService implements TbClusterService {
}
private void broadcast(ToTransportMsg transportMsg, TbQueueCallback callback) {
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT);
broadcast(transportMsg, tbTransportServices, callback);
}
private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) {
Set<String> tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT).stream()
.filter(info -> info.getTransportsList().contains(transportType))
.map(TransportProtos.ServiceInfo::getServiceId).collect(Collectors.toSet());
broadcast(transportMsg, tbTransportServices, callback);
}
private void broadcast(ToTransportMsg transportMsg, Set<String> tbTransportServices, TbQueueCallback callback) {
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportServices.size(), callback) : null;
for (String transportServiceId : tbTransportServices) {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);
@ -400,20 +411,6 @@ public class DefaultTbClusterService implements TbClusterService {
}
}
private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) {
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
Set<TransportProtos.ServiceInfo> tbTransportInfos = partitionService.getAllServices(ServiceType.TB_TRANSPORT);
TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportInfos.size(), callback) : null;
tbTransportInfos.stream()
.filter(info -> info.getTransportsList().contains(transportType))
.map(TransportProtos.ServiceInfo::getServiceId)
.forEach(transportServiceId -> {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);
toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), proxyCallback);
toTransportNfs.incrementAndGet();
});
}
@Override
public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);

Loading…
Cancel
Save