From b86a1546e8a9e77e86f41e166324a0f1fd63dbeb Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 1 Apr 2022 13:33:11 +0200 Subject: [PATCH] created queue routing service --- .../queue/DefaultQueueRoutingInfoService.java | 54 ++++++++++++++++ .../transport/DefaultTransportApiService.java | 36 +++++++++++ common/cluster-api/src/main/proto/queue.proto | 37 +++++++++++ .../queue/discovery/HashPartitionService.java | 2 - .../queue/discovery/QueueRoutingInfo.java | 54 ++++++++++++++++ .../discovery/QueueRoutingInfoService.java | 29 +++++++++ .../common/transport/TransportService.java | 7 ++ .../service/DefaultTransportService.java | 36 +++++++++++ .../TransportQueueRoutingInfoService.java | 64 +++++++++++++++++++ 9 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/queue/DefaultQueueRoutingInfoService.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfo.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfoService.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportQueueRoutingInfoService.java diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultQueueRoutingInfoService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultQueueRoutingInfoService.java new file mode 100644 index 0000000000..df22d42e62 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultQueueRoutingInfoService.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.queue.QueueService; +import org.thingsboard.server.queue.discovery.QueueRoutingInfo; +import org.thingsboard.server.queue.discovery.QueueRoutingInfoService; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Service +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'") +public class DefaultQueueRoutingInfoService implements QueueRoutingInfoService { + + private final QueueService queueService; + + public DefaultQueueRoutingInfoService(QueueService queueService) { + this.queueService = queueService; + } + + @Override + public List getAllQueuesRoutingInfo() { + return queueService.findAllQueues().stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); + } + + @Override + public List getMainQueuesRoutingInfo() { + return queueService.findAllMainQueues().stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); + } + + @Override + public List getQueuesRoutingInfo(TenantId tenantId) { + return queueService.findQueuesByTenantId(tenantId).stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 5fda933234..0cba1ec325 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -57,6 +57,7 @@ import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.ota.OtaPackageUtil; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; @@ -72,6 +73,7 @@ import org.thingsboard.server.dao.device.provision.ProvisionFailedException; import org.thingsboard.server.dao.device.provision.ProvisionRequest; import org.thingsboard.server.dao.device.provision.ProvisionResponse; import org.thingsboard.server.dao.ota.OtaPackageService; +import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; @@ -99,6 +101,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.resource.TbResourceService; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -134,6 +137,7 @@ public class DefaultTransportApiService implements TransportApiService { private final TbResourceService resourceService; private final OtaPackageService otaPackageService; private final OtaPackageDataCache otaPackageDataCache; + private final QueueService queueService; private final ConcurrentMap deviceCreationLocks = new ConcurrentHashMap<>(); @@ -176,6 +180,12 @@ public class DefaultTransportApiService implements TransportApiService { result = handle(transportApiRequestMsg.getDeviceCredentialsRequestMsg()); } else if (transportApiRequestMsg.hasOtaPackageRequestMsg()) { result = handle(transportApiRequestMsg.getOtaPackageRequestMsg()); + } else if (transportApiRequestMsg.hasGetAllMainQueueRoutingInfoRequestMsg()) { + return Futures.transform(handle(transportApiRequestMsg.getGetAllMainQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); + } else if (transportApiRequestMsg.hasGetTenantQueueRoutingInfoRequestMsg()) { + return Futures.transform(handle(transportApiRequestMsg.getGetTenantQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); + } else if (transportApiRequestMsg.hasGetAllQueueRoutingInfoRequestMsg()) { + return Futures.transform(handle(transportApiRequestMsg.getGetAllQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); } return Futures.transform(Optional.ofNullable(result).orElseGet(this::getEmptyTransportApiResponseFuture), @@ -636,6 +646,32 @@ public class DefaultTransportApiService implements TransportApiService { } } + private ListenableFuture handle(TransportProtos.GetAllMainQueueRoutingInfoRequestMsg requestMsg) { + return queuesToTransportApiResponseMsg(queueService.findAllMainQueues()); + } + + private ListenableFuture handle(TransportProtos.GetAllQueueRoutingInfoRequestMsg requestMsg) { + return queuesToTransportApiResponseMsg(queueService.findAllQueues()); + } + + private ListenableFuture handle(TransportProtos.GetTenantQueueRoutingInfoRequestMsg requestMsg) { + TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); + return queuesToTransportApiResponseMsg(queueService.findQueuesByTenantId(tenantId)); + } + + private ListenableFuture queuesToTransportApiResponseMsg(List queues) { + return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() + .addAllGetQueueRoutingInfoResponseMsgs(queues.stream() + .map(queue -> TransportProtos.GetQueueRoutingInfoResponseMsg.newBuilder() + .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) + .setQueueName(queue.getName()) + .setQueueTopic(queue.getTopic()) + .setPartitions(queue.getPartitions()) + .build()).collect(Collectors.toList())).build()); + } + + private Long checkLong(Long l) { return l != null ? l : 0; } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index a9d517f4dd..1084cb2e44 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -197,6 +197,39 @@ message GetEntityProfileRequestMsg { int64 entityIdLSB = 3; } +message GetAllMainQueueRoutingInfoRequestMsg { +} + +message GetAllQueueRoutingInfoRequestMsg { +} + +message GetTenantQueueRoutingInfoRequestMsg { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; +} + +message GetQueueRoutingInfoResponseMsg { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + string queueName = 3; + string queueTopic = 4; + int32 partitions = 5; +} + +message QueueUpdateMsg { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + string queueName = 3; + string queueTopic = 4; + int32 partitions = 5; +} + +message QueueDeleteMsg { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + string queueName = 3; +} + message LwM2MRegistrationRequestMsg { string tenantId = 1; string endpoint = 2; @@ -673,6 +706,9 @@ message TransportApiRequestMsg { GetSnmpDevicesRequestMsg snmpDevicesRequestMsg = 11; GetDeviceRequestMsg deviceRequestMsg = 12; GetDeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 13; + GetAllMainQueueRoutingInfoRequestMsg GetAllMainQueueRoutingInfoRequestMsg = 14; + GetTenantQueueRoutingInfoRequestMsg getTenantQueueRoutingInfoRequestMsg = 15; + GetAllQueueRoutingInfoRequestMsg getAllQueueRoutingInfoRequestMsg = 16; } /* Response from ThingsBoard Core Service to Transport Service */ @@ -687,6 +723,7 @@ message TransportApiResponseMsg { GetOtaPackageResponseMsg otaPackageResponseMsg = 8; GetDeviceResponseMsg deviceResponseMsg = 9; GetDeviceCredentialsResponseMsg deviceCredentialsResponseMsg = 10; + repeated GetQueueRoutingInfoResponseMsg getQueueRoutingInfoResponseMsgs = 11; } /* Messages that are handled by ThingsBoard Core Service */ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 96df34186c..4e27a01760 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -72,8 +72,6 @@ public class HashPartitionService implements PartitionService { private ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); private ConcurrentMap tpiCache = new ConcurrentHashMap<>(); - private Map tbCoreNotificationTopics = new HashMap<>(); - private Map tbRuleEngineNotificationTopics = new HashMap<>(); private Map> tbTransportServicesByType = new HashMap<>(); private List currentOtherServices; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfo.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfo.java new file mode 100644 index 0000000000..8afecc4413 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfo.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.gen.transport.TransportProtos.GetQueueRoutingInfoResponseMsg; + +import java.util.UUID; + + +@Data +public class QueueRoutingInfo { + + private final TenantId tenantId; + private final String queueName; + private final String queueTopic; + private final int partitions; + + public QueueRoutingInfo(TenantId tenantId, String queueName, String queueTopic, int partitions) { + this.tenantId = tenantId; + this.queueName = queueName; + this.queueTopic = queueTopic; + this.partitions = partitions; + } + + public QueueRoutingInfo(Queue queue) { + this.tenantId = queue.getTenantId(); + this.queueName = queue.getName(); + this.queueTopic = queue.getTopic(); + this.partitions = queue.getPartitions(); + } + + public QueueRoutingInfo(GetQueueRoutingInfoResponseMsg routingInfo) { + this.tenantId = new TenantId(new UUID(routingInfo.getTenantIdMSB(), routingInfo.getTenantIdLSB())); + this.queueName = routingInfo.getQueueName(); + this.queueTopic = routingInfo.getQueueTopic(); + this.partitions = routingInfo.getPartitions(); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfoService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfoService.java new file mode 100644 index 0000000000..366452d9e8 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfoService.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery; + +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.List; + +public interface QueueRoutingInfoService { + + List getAllQueuesRoutingInfo(); + + List getMainQueuesRoutingInfo(); + + List getQueuesRoutingInfo(TenantId tenantId); +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 6c788d93f3..53c1997f5c 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -57,6 +57,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -67,6 +68,12 @@ public interface TransportService { GetEntityProfileResponseMsg getEntityProfile(GetEntityProfileRequestMsg msg); + List getQueueRoutingInfo(TransportProtos.GetAllMainQueueRoutingInfoRequestMsg msg); + + List getQueueRoutingInfo(TransportProtos.GetTenantQueueRoutingInfoRequestMsg msg); + + List getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg); + GetResourceResponseMsg getResource(GetResourceRequestMsg msg); GetSnmpDevicesResponseMsg getSnmpDevicesIds(GetSnmpDevicesRequestMsg requestMsg); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 77738a1e9d..3de2a6a7d5 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -304,6 +304,42 @@ public class DefaultTransportService implements TransportService { } } + @Override + public List getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg) { + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build()); + try { + TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); + return response.getValue().getGetQueueRoutingInfoResponseMsgsList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public List getQueueRoutingInfo(TransportProtos.GetAllMainQueueRoutingInfoRequestMsg msg) { + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllMainQueueRoutingInfoRequestMsg(msg).build()); + try { + TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); + return response.getValue().getGetQueueRoutingInfoResponseMsgsList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public List getQueueRoutingInfo(TransportProtos.GetTenantQueueRoutingInfoRequestMsg msg) { + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetTenantQueueRoutingInfoRequestMsg(msg).build()); + try { + TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); + return response.getValue().getGetQueueRoutingInfoResponseMsgsList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) { TbProtoQueueMsg protoMsg = diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportQueueRoutingInfoService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportQueueRoutingInfoService.java new file mode 100644 index 0000000000..bcdc07b5b0 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportQueueRoutingInfoService.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.gen.transport.TransportProtos.GetAllQueueRoutingInfoRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAllMainQueueRoutingInfoRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantQueueRoutingInfoRequestMsg; +import org.thingsboard.server.queue.discovery.QueueRoutingInfo; +import org.thingsboard.server.queue.discovery.QueueRoutingInfoService; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Service +@ConditionalOnExpression("'${service.type:null}'=='tb-transport'") +public class TransportQueueRoutingInfoService implements QueueRoutingInfoService { + + private TransportService transportService; + + @Lazy + @Autowired + public void setTransportService(TransportService transportService) { + this.transportService = transportService; + } + + @Override + public List getAllQueuesRoutingInfo() { + GetAllQueueRoutingInfoRequestMsg msg = GetAllQueueRoutingInfoRequestMsg.newBuilder().build(); + return transportService.getQueueRoutingInfo(msg).stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); + } + + @Override + public List getMainQueuesRoutingInfo() { + GetAllMainQueueRoutingInfoRequestMsg msg = GetAllMainQueueRoutingInfoRequestMsg.newBuilder().build(); + return transportService.getQueueRoutingInfo(msg).stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); + } + + @Override + public List getQueuesRoutingInfo(TenantId tenantId) { + GetTenantQueueRoutingInfoRequestMsg msg = GetTenantQueueRoutingInfoRequestMsg.newBuilder().build(); + return transportService.getQueueRoutingInfo(msg).stream().map(QueueRoutingInfo::new).collect(Collectors.toList()); + } +}