Browse Source

created queue routing service

pull/6134/head
YevhenBondarenko 4 years ago
parent
commit
b86a1546e8
  1. 54
      application/src/main/java/org/thingsboard/server/service/queue/DefaultQueueRoutingInfoService.java
  2. 36
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  3. 37
      common/cluster-api/src/main/proto/queue.proto
  4. 2
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java
  5. 54
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfo.java
  6. 29
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfoService.java
  7. 7
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  8. 36
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  9. 64
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportQueueRoutingInfoService.java

54
application/src/main/java/org/thingsboard/server/service/queue/DefaultQueueRoutingInfoService.java

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.queue.discovery.QueueRoutingInfo;
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'")
public class DefaultQueueRoutingInfoService implements QueueRoutingInfoService {
private final QueueService queueService;
public DefaultQueueRoutingInfoService(QueueService queueService) {
this.queueService = queueService;
}
@Override
public List<QueueRoutingInfo> getAllQueuesRoutingInfo() {
return queueService.findAllQueues().stream().map(QueueRoutingInfo::new).collect(Collectors.toList());
}
@Override
public List<QueueRoutingInfo> getMainQueuesRoutingInfo() {
return queueService.findAllMainQueues().stream().map(QueueRoutingInfo::new).collect(Collectors.toList());
}
@Override
public List<QueueRoutingInfo> getQueuesRoutingInfo(TenantId tenantId) {
return queueService.findQueuesByTenantId(tenantId).stream().map(QueueRoutingInfo::new).collect(Collectors.toList());
}
}

36
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -57,6 +57,7 @@ import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
@ -72,6 +73,7 @@ import org.thingsboard.server.dao.device.provision.ProvisionFailedException;
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
import org.thingsboard.server.dao.device.provision.ProvisionResponse;
import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -99,6 +101,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.resource.TbResourceService;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -134,6 +137,7 @@ public class DefaultTransportApiService implements TransportApiService {
private final TbResourceService resourceService;
private final OtaPackageService otaPackageService;
private final OtaPackageDataCache otaPackageDataCache;
private final QueueService queueService;
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
@ -176,6 +180,12 @@ public class DefaultTransportApiService implements TransportApiService {
result = handle(transportApiRequestMsg.getDeviceCredentialsRequestMsg());
} else if (transportApiRequestMsg.hasOtaPackageRequestMsg()) {
result = handle(transportApiRequestMsg.getOtaPackageRequestMsg());
} else if (transportApiRequestMsg.hasGetAllMainQueueRoutingInfoRequestMsg()) {
return Futures.transform(handle(transportApiRequestMsg.getGetAllMainQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
} else if (transportApiRequestMsg.hasGetTenantQueueRoutingInfoRequestMsg()) {
return Futures.transform(handle(transportApiRequestMsg.getGetTenantQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
} else if (transportApiRequestMsg.hasGetAllQueueRoutingInfoRequestMsg()) {
return Futures.transform(handle(transportApiRequestMsg.getGetAllQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
}
return Futures.transform(Optional.ofNullable(result).orElseGet(this::getEmptyTransportApiResponseFuture),
@ -636,6 +646,32 @@ public class DefaultTransportApiService implements TransportApiService {
}
}
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetAllMainQueueRoutingInfoRequestMsg requestMsg) {
return queuesToTransportApiResponseMsg(queueService.findAllMainQueues());
}
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetAllQueueRoutingInfoRequestMsg requestMsg) {
return queuesToTransportApiResponseMsg(queueService.findAllQueues());
}
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetTenantQueueRoutingInfoRequestMsg requestMsg) {
TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
return queuesToTransportApiResponseMsg(queueService.findQueuesByTenantId(tenantId));
}
private ListenableFuture<TransportApiResponseMsg> queuesToTransportApiResponseMsg(List<Queue> queues) {
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder()
.addAllGetQueueRoutingInfoResponseMsgs(queues.stream()
.map(queue -> TransportProtos.GetQueueRoutingInfoResponseMsg.newBuilder()
.setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits())
.setQueueName(queue.getName())
.setQueueTopic(queue.getTopic())
.setPartitions(queue.getPartitions())
.build()).collect(Collectors.toList())).build());
}
private Long checkLong(Long l) {
return l != null ? l : 0;
}

37
common/cluster-api/src/main/proto/queue.proto

@ -197,6 +197,39 @@ message GetEntityProfileRequestMsg {
int64 entityIdLSB = 3;
}
message GetAllMainQueueRoutingInfoRequestMsg {
}
message GetAllQueueRoutingInfoRequestMsg {
}
message GetTenantQueueRoutingInfoRequestMsg {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
}
message GetQueueRoutingInfoResponseMsg {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string queueName = 3;
string queueTopic = 4;
int32 partitions = 5;
}
message QueueUpdateMsg {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string queueName = 3;
string queueTopic = 4;
int32 partitions = 5;
}
message QueueDeleteMsg {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string queueName = 3;
}
message LwM2MRegistrationRequestMsg {
string tenantId = 1;
string endpoint = 2;
@ -673,6 +706,9 @@ message TransportApiRequestMsg {
GetSnmpDevicesRequestMsg snmpDevicesRequestMsg = 11;
GetDeviceRequestMsg deviceRequestMsg = 12;
GetDeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 13;
GetAllMainQueueRoutingInfoRequestMsg GetAllMainQueueRoutingInfoRequestMsg = 14;
GetTenantQueueRoutingInfoRequestMsg getTenantQueueRoutingInfoRequestMsg = 15;
GetAllQueueRoutingInfoRequestMsg getAllQueueRoutingInfoRequestMsg = 16;
}
/* Response from ThingsBoard Core Service to Transport Service */
@ -687,6 +723,7 @@ message TransportApiResponseMsg {
GetOtaPackageResponseMsg otaPackageResponseMsg = 8;
GetDeviceResponseMsg deviceResponseMsg = 9;
GetDeviceCredentialsResponseMsg deviceCredentialsResponseMsg = 10;
repeated GetQueueRoutingInfoResponseMsg getQueueRoutingInfoResponseMsgs = 11;
}
/* Messages that are handled by ThingsBoard Core Service */

2
common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java

@ -72,8 +72,6 @@ public class HashPartitionService implements PartitionService {
private ConcurrentMap<ServiceQueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
private ConcurrentMap<TopicPartitionInfoKey, TopicPartitionInfo> tpiCache = new ConcurrentHashMap<>();
private Map<String, TopicPartitionInfo> tbCoreNotificationTopics = new HashMap<>();
private Map<String, TopicPartitionInfo> tbRuleEngineNotificationTopics = new HashMap<>();
private Map<String, List<ServiceInfo>> tbTransportServicesByType = new HashMap<>();
private List<ServiceInfo> currentOtherServices;

54
common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfo.java

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.discovery;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.gen.transport.TransportProtos.GetQueueRoutingInfoResponseMsg;
import java.util.UUID;
@Data
public class QueueRoutingInfo {
private final TenantId tenantId;
private final String queueName;
private final String queueTopic;
private final int partitions;
public QueueRoutingInfo(TenantId tenantId, String queueName, String queueTopic, int partitions) {
this.tenantId = tenantId;
this.queueName = queueName;
this.queueTopic = queueTopic;
this.partitions = partitions;
}
public QueueRoutingInfo(Queue queue) {
this.tenantId = queue.getTenantId();
this.queueName = queue.getName();
this.queueTopic = queue.getTopic();
this.partitions = queue.getPartitions();
}
public QueueRoutingInfo(GetQueueRoutingInfoResponseMsg routingInfo) {
this.tenantId = new TenantId(new UUID(routingInfo.getTenantIdMSB(), routingInfo.getTenantIdLSB()));
this.queueName = routingInfo.getQueueName();
this.queueTopic = routingInfo.getQueueTopic();
this.partitions = routingInfo.getPartitions();
}
}

29
common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueRoutingInfoService.java

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.discovery;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.List;
public interface QueueRoutingInfoService {
List<QueueRoutingInfo> getAllQueuesRoutingInfo();
List<QueueRoutingInfo> getMainQueuesRoutingInfo();
List<QueueRoutingInfo> getQueuesRoutingInfo(TenantId tenantId);
}

7
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -57,6 +57,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@ -67,6 +68,12 @@ public interface TransportService {
GetEntityProfileResponseMsg getEntityProfile(GetEntityProfileRequestMsg msg);
List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetAllMainQueueRoutingInfoRequestMsg msg);
List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetTenantQueueRoutingInfoRequestMsg msg);
List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg);
GetResourceResponseMsg getResource(GetResourceRequestMsg msg);
GetSnmpDevicesResponseMsg getSnmpDevicesIds(GetSnmpDevicesRequestMsg requestMsg);

36
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -304,6 +304,42 @@ public class DefaultTransportService implements TransportService {
}
}
@Override
public List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetAllQueueRoutingInfoRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllQueueRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getGetQueueRoutingInfoResponseMsgsList();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetAllMainQueueRoutingInfoRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetAllMainQueueRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getGetQueueRoutingInfoResponseMsgsList();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public List<TransportProtos.GetQueueRoutingInfoResponseMsg> getQueueRoutingInfo(TransportProtos.GetTenantQueueRoutingInfoRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetTenantQueueRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getGetQueueRoutingInfoResponseMsgsList();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =

64
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportQueueRoutingInfoService.java

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.gen.transport.TransportProtos.GetAllQueueRoutingInfoRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAllMainQueueRoutingInfoRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantQueueRoutingInfoRequestMsg;
import org.thingsboard.server.queue.discovery.QueueRoutingInfo;
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='tb-transport'")
public class TransportQueueRoutingInfoService implements QueueRoutingInfoService {
private TransportService transportService;
@Lazy
@Autowired
public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}
@Override
public List<QueueRoutingInfo> getAllQueuesRoutingInfo() {
GetAllQueueRoutingInfoRequestMsg msg = GetAllQueueRoutingInfoRequestMsg.newBuilder().build();
return transportService.getQueueRoutingInfo(msg).stream().map(QueueRoutingInfo::new).collect(Collectors.toList());
}
@Override
public List<QueueRoutingInfo> getMainQueuesRoutingInfo() {
GetAllMainQueueRoutingInfoRequestMsg msg = GetAllMainQueueRoutingInfoRequestMsg.newBuilder().build();
return transportService.getQueueRoutingInfo(msg).stream().map(QueueRoutingInfo::new).collect(Collectors.toList());
}
@Override
public List<QueueRoutingInfo> getQueuesRoutingInfo(TenantId tenantId) {
GetTenantQueueRoutingInfoRequestMsg msg = GetTenantQueueRoutingInfoRequestMsg.newBuilder().build();
return transportService.getQueueRoutingInfo(msg).stream().map(QueueRoutingInfo::new).collect(Collectors.toList());
}
}
Loading…
Cancel
Save