28 changed files with 975 additions and 95 deletions
@ -0,0 +1,79 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.azure.servicebus; |
|||
|
|||
import com.microsoft.azure.servicebus.management.ManagementClient; |
|||
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; |
|||
import com.microsoft.azure.servicebus.primitives.ServiceBusException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
|
|||
import javax.annotation.PreDestroy; |
|||
import java.io.IOException; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") |
|||
public class TbServiceBusAdmin implements TbQueueAdmin { |
|||
|
|||
private final Set<String> queues = ConcurrentHashMap.newKeySet(); |
|||
|
|||
private final ManagementClient client; |
|||
|
|||
public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) { |
|||
ConnectionStringBuilder builder = new ConnectionStringBuilder( |
|||
serviceBusSettings.getNamespaceName(), |
|||
"queues", |
|||
serviceBusSettings.getSasKeyName(), |
|||
serviceBusSettings.getSasKey()); |
|||
|
|||
client = new ManagementClient(builder); |
|||
|
|||
try { |
|||
client.getQueues().forEach(queueDescription -> queues.add(queueDescription.getPath())); |
|||
} catch (ServiceBusException | InterruptedException e) { |
|||
log.error("Failed to get queues.", e); |
|||
throw new RuntimeException("Failed to get queues.", e); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void createTopicIfNotExists(String topic) { |
|||
if (queues.contains(topic)) { |
|||
return; |
|||
} |
|||
|
|||
try { |
|||
client.createQueue(topic); |
|||
queues.add(topic); |
|||
} catch (ServiceBusException | InterruptedException e) { |
|||
log.error("Failed to create queue: [{}]", topic, e); |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
private void destroy() { |
|||
try { |
|||
client.close(); |
|||
} catch (IOException e) { |
|||
log.error("Failed to close ManagementClient."); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,210 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.azure.servicebus; |
|||
|
|||
import com.google.gson.Gson; |
|||
import com.google.protobuf.InvalidProtocolBufferException; |
|||
import com.microsoft.azure.servicebus.TransactionContext; |
|||
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; |
|||
import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver; |
|||
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag; |
|||
import com.microsoft.azure.servicebus.primitives.MessagingEntityType; |
|||
import com.microsoft.azure.servicebus.primitives.MessagingFactory; |
|||
import com.microsoft.azure.servicebus.primitives.SettleModePair; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.qpid.proton.amqp.messaging.Data; |
|||
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; |
|||
import org.apache.qpid.proton.amqp.transport.SenderSettleMode; |
|||
import org.springframework.util.CollectionUtils; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueMsg; |
|||
import org.thingsboard.server.queue.TbQueueMsgDecoder; |
|||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg; |
|||
|
|||
import java.time.Duration; |
|||
import java.util.Collection; |
|||
import java.util.Collections; |
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.concurrent.CompletableFuture; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.Stream; |
|||
|
|||
@Slf4j |
|||
public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { |
|||
private final TbQueueAdmin admin; |
|||
private final String topic; |
|||
private final TbQueueMsgDecoder<T> decoder; |
|||
private final TbServiceBusSettings serviceBusSettings; |
|||
|
|||
private final Gson gson = new Gson(); |
|||
|
|||
private Set<CoreMessageReceiver> receivers; |
|||
private volatile Set<TopicPartitionInfo> partitions; |
|||
private volatile boolean subscribed; |
|||
private volatile boolean stopped = false; |
|||
private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>(); |
|||
private volatile int messagesPerQueue; |
|||
|
|||
public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) { |
|||
this.admin = admin; |
|||
this.decoder = decoder; |
|||
this.topic = topic; |
|||
this.serviceBusSettings = serviceBusSettings; |
|||
} |
|||
|
|||
@Override |
|||
public String getTopic() { |
|||
return topic; |
|||
} |
|||
|
|||
@Override |
|||
public void subscribe() { |
|||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); |
|||
subscribed = false; |
|||
} |
|||
|
|||
@Override |
|||
public void subscribe(Set<TopicPartitionInfo> partitions) { |
|||
this.partitions = partitions; |
|||
subscribed = false; |
|||
} |
|||
|
|||
@Override |
|||
public void unsubscribe() { |
|||
stopped = true; |
|||
receivers.forEach(CoreMessageReceiver::closeAsync); |
|||
} |
|||
|
|||
@Override |
|||
public List<T> poll(long durationInMillis) { |
|||
if (!subscribed && partitions == null) { |
|||
try { |
|||
Thread.sleep(durationInMillis); |
|||
} catch (InterruptedException e) { |
|||
log.debug("Failed to await subscription", e); |
|||
} |
|||
} else { |
|||
if (!subscribed) { |
|||
createReceivers(); |
|||
messagesPerQueue = receivers.size() / partitions.size(); |
|||
subscribed = true; |
|||
} |
|||
|
|||
List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures = |
|||
receivers.stream() |
|||
.map(receiver -> receiver |
|||
.receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis)) |
|||
.whenComplete((messages, err) -> { |
|||
if (!CollectionUtils.isEmpty(messages)) { |
|||
pendingMessages.put(receiver, messages); |
|||
} else if (err != null) { |
|||
log.error("Failed to receive messages.", err); |
|||
} |
|||
})) |
|||
.collect(Collectors.toList()); |
|||
try { |
|||
return fromList(messageFutures) |
|||
.get() |
|||
.stream() |
|||
.flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream()) |
|||
.map(message -> { |
|||
try { |
|||
return decode(message); |
|||
} catch (InvalidProtocolBufferException e) { |
|||
log.error("Failed to parse message.", e); |
|||
throw new RuntimeException("Failed to parse message.", e); |
|||
} |
|||
}).collect(Collectors.toList()); |
|||
} catch (InterruptedException | ExecutionException e) { |
|||
if (stopped) { |
|||
log.info("[{}] Service Bus consumer is stopped.", topic); |
|||
} else { |
|||
log.error("Failed to receive messages", e); |
|||
} |
|||
} |
|||
} |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
private void createReceivers() { |
|||
List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream() |
|||
.map(TopicPartitionInfo::getFullTopicName) |
|||
.map(queue -> { |
|||
MessagingFactory factory; |
|||
try { |
|||
factory = MessagingFactory.createFromConnectionStringBuilder(createConnection(queue)); |
|||
} catch (InterruptedException | ExecutionException e) { |
|||
log.error("Failed to create factory for the queue [{}]", queue); |
|||
throw new RuntimeException("Failed to create the factory", e); |
|||
} |
|||
|
|||
return CoreMessageReceiver.create(factory, queue, queue, 0, |
|||
new SettleModePair(SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND), |
|||
MessagingEntityType.QUEUE); |
|||
}).collect(Collectors.toList()); |
|||
|
|||
try { |
|||
receivers = new HashSet<>(fromList(receiverFutures).get()); |
|||
} catch (InterruptedException | ExecutionException e) { |
|||
if (stopped) { |
|||
log.info("[{}] Service Bus consumer is stopped.", topic); |
|||
} else { |
|||
log.error("Failed to create receivers", e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private ConnectionStringBuilder createConnection(String queue) { |
|||
admin.createTopicIfNotExists(queue); |
|||
return new ConnectionStringBuilder( |
|||
serviceBusSettings.getNamespaceName(), |
|||
queue, |
|||
serviceBusSettings.getSasKeyName(), |
|||
serviceBusSettings.getSasKey()); |
|||
} |
|||
|
|||
private <V> CompletableFuture<List<V>> fromList(List<CompletableFuture<V>> futures) { |
|||
CompletableFuture<Collection<V>>[] arrayFuture = new CompletableFuture[futures.size()]; |
|||
futures.toArray(arrayFuture); |
|||
|
|||
return CompletableFuture |
|||
.allOf(arrayFuture) |
|||
.thenApply(v -> futures |
|||
.stream() |
|||
.map(CompletableFuture::join) |
|||
.collect(Collectors.toList())); |
|||
} |
|||
|
|||
@Override |
|||
public void commit() { |
|||
pendingMessages.forEach((receiver, msgs) -> |
|||
msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN))); |
|||
pendingMessages.clear(); |
|||
} |
|||
|
|||
private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { |
|||
DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); |
|||
return decoder.decode(msg); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,110 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.azure.servicebus; |
|||
|
|||
import com.google.gson.Gson; |
|||
import com.microsoft.azure.servicebus.IMessage; |
|||
import com.microsoft.azure.servicebus.Message; |
|||
import com.microsoft.azure.servicebus.QueueClient; |
|||
import com.microsoft.azure.servicebus.ReceiveMode; |
|||
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; |
|||
import com.microsoft.azure.servicebus.primitives.ServiceBusException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueCallback; |
|||
import org.thingsboard.server.queue.TbQueueMsg; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import java.util.concurrent.CompletableFuture; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
|
|||
@Slf4j |
|||
public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { |
|||
private final String defaultTopic; |
|||
private final Gson gson = new Gson(); |
|||
private final TbQueueAdmin admin; |
|||
private final TbServiceBusSettings serviceBusSettings; |
|||
private final Map<String, QueueClient> clients = new HashMap<>(); |
|||
private ExecutorService executorService; |
|||
|
|||
public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) { |
|||
this.admin = admin; |
|||
this.defaultTopic = defaultTopic; |
|||
this.serviceBusSettings = serviceBusSettings; |
|||
executorService = Executors.newSingleThreadExecutor(); |
|||
} |
|||
|
|||
@Override |
|||
public void init() { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public String getDefaultTopic() { |
|||
return defaultTopic; |
|||
} |
|||
|
|||
@Override |
|||
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { |
|||
IMessage message = new Message(gson.toJson(new DefaultTbQueueMsg(msg))); |
|||
CompletableFuture<Void> future = getClient(tpi.getFullTopicName()).sendAsync(message); |
|||
future.whenCompleteAsync((success, err) -> { |
|||
if (err != null) { |
|||
callback.onFailure(err); |
|||
} else { |
|||
callback.onSuccess(null); |
|||
} |
|||
}, executorService); |
|||
} |
|||
|
|||
@Override |
|||
public void stop() { |
|||
clients.forEach((t, client) -> { |
|||
try { |
|||
client.close(); |
|||
} catch (ServiceBusException e) { |
|||
log.error("Failed to close QueueClient.", e); |
|||
} |
|||
}); |
|||
|
|||
if (executorService != null) { |
|||
executorService.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
private QueueClient getClient(String topic) { |
|||
return clients.computeIfAbsent(topic, k -> { |
|||
admin.createTopicIfNotExists(topic); |
|||
ConnectionStringBuilder builder = |
|||
new ConnectionStringBuilder( |
|||
serviceBusSettings.getNamespaceName(), |
|||
topic, |
|||
serviceBusSettings.getSasKeyName(), |
|||
serviceBusSettings.getSasKey()); |
|||
try { |
|||
return new QueueClient(builder, ReceiveMode.PEEKLOCK); |
|||
} catch (InterruptedException | ServiceBusException e) { |
|||
log.error("Failed to create new client for the Queue: [{}]", topic, e); |
|||
throw new RuntimeException("Failed to create new client for the Queue", e); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.azure.servicebus; |
|||
|
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Slf4j |
|||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") |
|||
@Component |
|||
@Data |
|||
public class TbServiceBusSettings { |
|||
@Value("${queue.service_bus.namespace_name}") |
|||
private String namespaceName; |
|||
@Value("${queue.service_bus.sas_key_name}") |
|||
private String sasKeyName; |
|||
@Value("${queue.service_bus.sas_key}") |
|||
private String sasKey; |
|||
@Value("${queue.service_bus.max_messages}") |
|||
private int maxMessages; |
|||
} |
|||
@ -0,0 +1,134 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.provider; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
|||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") |
|||
public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { |
|||
|
|||
private final PartitionService partitionService; |
|||
private final TbQueueCoreSettings coreSettings; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final TbQueueRuleEngineSettings ruleEngineSettings; |
|||
private final TbQueueTransportApiSettings transportApiSettings; |
|||
private final TbQueueTransportNotificationSettings transportNotificationSettings; |
|||
private final TbServiceBusSettings serviceBusSettings; |
|||
private final TbQueueAdmin admin; |
|||
|
|||
public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
|||
TbQueueRuleEngineSettings ruleEngineSettings, |
|||
TbServiceInfoProvider serviceInfoProvider, |
|||
TbQueueTransportApiSettings transportApiSettings, |
|||
TbQueueTransportNotificationSettings transportNotificationSettings, |
|||
TbServiceBusSettings serviceBusSettings, |
|||
TbQueueAdmin admin) { |
|||
this.partitionService = partitionService; |
|||
this.coreSettings = coreSettings; |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.ruleEngineSettings = ruleEngineSettings; |
|||
this.transportApiSettings = transportApiSettings; |
|||
this.transportNotificationSettings = transportNotificationSettings; |
|||
this.serviceBusSettings = serviceBusSettings; |
|||
this.admin = admin; |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, |
|||
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, |
|||
partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic()); |
|||
} |
|||
} |
|||
@ -0,0 +1,116 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.provider; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") |
|||
public class ServiceBusTbCoreQueueProvider implements TbCoreQueueFactory { |
|||
|
|||
private final TbServiceBusSettings serviceBusSettings; |
|||
private final TbQueueRuleEngineSettings ruleEngineSettings; |
|||
private final TbQueueCoreSettings coreSettings; |
|||
private final TbQueueTransportApiSettings transportApiSettings; |
|||
private final PartitionService partitionService; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final TbQueueAdmin admin; |
|||
|
|||
public ServiceBusTbCoreQueueProvider(TbServiceBusSettings serviceBusSettings, |
|||
TbQueueCoreSettings coreSettings, |
|||
TbQueueTransportApiSettings transportApiSettings, |
|||
TbQueueRuleEngineSettings ruleEngineSettings, |
|||
PartitionService partitionService, |
|||
TbServiceInfoProvider serviceInfoProvider, |
|||
TbQueueAdmin admin) { |
|||
this.serviceBusSettings = serviceBusSettings; |
|||
this.coreSettings = coreSettings; |
|||
this.transportApiSettings = transportApiSettings; |
|||
this.ruleEngineSettings = ruleEngineSettings; |
|||
this.partitionService = partitionService; |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.admin = admin; |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, |
|||
partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
} |
|||
@ -0,0 +1,99 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.provider; |
|||
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
|||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") |
|||
public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
|||
|
|||
private final PartitionService partitionService; |
|||
private final TbQueueCoreSettings coreSettings; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final TbQueueRuleEngineSettings ruleEngineSettings; |
|||
private final TbServiceBusSettings serviceBusSettings; |
|||
private final TbQueueAdmin admin; |
|||
|
|||
public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
|||
TbQueueRuleEngineSettings ruleEngineSettings, |
|||
TbServiceInfoProvider serviceInfoProvider, |
|||
TbServiceBusSettings serviceBusSettings, |
|||
TbQueueAdmin admin) { |
|||
this.partitionService = partitionService; |
|||
this.coreSettings = coreSettings; |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
this.ruleEngineSettings = ruleEngineSettings; |
|||
this.serviceBusSettings = serviceBusSettings; |
|||
this.admin = admin; |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, |
|||
partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
} |
|||
@ -0,0 +1,94 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.provider; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.TbQueueRequestTemplate; |
|||
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
|||
import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
|||
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
|||
|
|||
@Component |
|||
@ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") |
|||
@Slf4j |
|||
public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory { |
|||
private final TbQueueTransportApiSettings transportApiSettings; |
|||
private final TbQueueTransportNotificationSettings transportNotificationSettings; |
|||
private final TbServiceBusSettings serviceBusSettings; |
|||
private final TbQueueAdmin admin; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
|
|||
public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, |
|||
TbQueueTransportNotificationSettings transportNotificationSettings, |
|||
TbServiceBusSettings serviceBusSettings, |
|||
TbServiceInfoProvider serviceInfoProvider, |
|||
TbQueueAdmin admin) { |
|||
this.transportApiSettings = transportApiSettings; |
|||
this.transportNotificationSettings = transportNotificationSettings; |
|||
this.serviceBusSettings = serviceBusSettings; |
|||
this.admin = admin; |
|||
this.serviceInfoProvider = serviceInfoProvider; |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiRequestTemplate() { |
|||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate = |
|||
new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); |
|||
|
|||
TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate = |
|||
new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, |
|||
transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
|
|||
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder |
|||
<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); |
|||
templateBuilder.queueAdmin(admin); |
|||
templateBuilder.requestTemplate(producerTemplate); |
|||
templateBuilder.responseTemplate(consumerTemplate); |
|||
templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); |
|||
templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout()); |
|||
templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval()); |
|||
return templateBuilder.build(); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { |
|||
return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); |
|||
} |
|||
|
|||
@Override |
|||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsConsumer() { |
|||
return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, |
|||
transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), |
|||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); |
|||
} |
|||
} |
|||
Loading…
Reference in new issue