diff --git a/framework/src/Volo.Abp.Cli.Core/Volo/Abp/Cli/ProjectBuilding/Building/ModuleInfo.cs b/framework/src/Volo.Abp.Cli.Core/Volo/Abp/Cli/ProjectBuilding/Building/ModuleInfo.cs index 260fbb745f..1d8da0e3b1 100644 --- a/framework/src/Volo.Abp.Cli.Core/Volo/Abp/Cli/ProjectBuilding/Building/ModuleInfo.cs +++ b/framework/src/Volo.Abp.Cli.Core/Volo/Abp/Cli/ProjectBuilding/Building/ModuleInfo.cs @@ -23,5 +23,7 @@ public bool MvcUi { get; set; } public bool BlazorUi { get; set; } + + public bool IsFreeToActiveLicenseOwners { get; set; } } } diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs index 1769d8a076..26d15ce818 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs @@ -9,11 +9,13 @@ namespace Volo.Abp.Kafka public KafkaConnections Connections { get; } public Action ConfigureProducer { get; set; } - + public Action ConfigureConsumer { get; set; } public Action ConfigureTopic { get; set; } + public bool ReQueue { get; set; } = true; + public AbpKafkaOptions() { Connections = new KafkaConnections(); diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs index bb7e2d66a3..2ff70dd8d5 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Linq; using System.Threading.Tasks; using Confluent.Kafka; using Confluent.Kafka.Admin; @@ -20,6 +19,8 @@ namespace Volo.Abp.Kafka protected IConsumerPool ConsumerPool { get; } + protected IProducerPool ProducerPool { get; } + protected IExceptionNotifier ExceptionNotifier { get; } protected AbpKafkaOptions Options { get; } @@ -37,10 +38,12 @@ namespace Volo.Abp.Kafka public KafkaMessageConsumer( IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier, - IOptions options) + IOptions options, + IProducerPool producerPool) { ConsumerPool = consumerPool; ExceptionNotifier = exceptionNotifier; + ProducerPool = producerPool; Options = options.Value; Logger = NullLogger.Instance; @@ -132,14 +135,29 @@ namespace Volo.Abp.Kafka { await callback(consumeResult.Message); } - - Consumer.Commit(consumeResult); } catch (Exception ex) { + await RequeueAsync(consumeResult); + Logger.LogException(ex); await ExceptionNotifier.NotifyAsync(ex); } + finally + { + Consumer.Commit(consumeResult); + } + } + + protected virtual async Task RequeueAsync(ConsumeResult consumeResult) + { + if (!Options.ReQueue) + { + return; + } + + var producer = ProducerPool.Get(ConnectionName); + await producer.ProduceAsync(consumeResult.Topic, consumeResult.Message); } public virtual void Dispose() diff --git a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs index 0c180a4ac7..f757efab64 100644 --- a/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs +++ b/framework/src/Volo.Abp.RabbitMQ/Volo/Abp/RabbitMQ/RabbitMqMessageConsumer.cs @@ -191,7 +191,16 @@ namespace Volo.Abp.RabbitMQ } catch (Exception ex) { - Channel.BasicNack(basicDeliverEventArgs.DeliveryTag, multiple: false, requeue: true); + try + { + Channel.BasicNack( + basicDeliverEventArgs.DeliveryTag, + multiple: false, + requeue: true + ); + } + catch { } + Logger.LogException(ex); await ExceptionNotifier.NotifyAsync(ex); }