// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. using System; using System.Runtime.ExceptionServices; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Worker { public class QueueWorker : IHostedService { private readonly IConfiguration _configuration; private readonly ILogger _logger; public QueueWorker(ILogger logger, IConfiguration configuration) { _logger = logger; _configuration = configuration; } public async Task StartAsync(CancellationToken cancellationToken) { try { var queue = await ConnectAsync(cancellationToken); queue.QueueDeclare( queue: "orders", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(queue); consumer.Received += (model, ea) => { // Use the raw log API to avoid formatting the JSON string (since it has {}) var text = Encoding.UTF8.GetString(ea.Body); _logger.Log(LogLevel.Information, 0, "Dequeued " + text, exception: null, formatter: (m, e) => m); }; queue.BasicConsume( queue: "orders", autoAck: true, consumer: consumer); } catch (Exception ex) { _logger.LogError(0, ex, "Failed to start listening to rabbit mq"); throw; } } private async Task ConnectAsync(CancellationToken cancellationToken) { ExceptionDispatchInfo? edi = null; for (var i = 0; i < 5; i++) { try { AmqpTcpEndpoint endpoint; var connectionString = _configuration["connectionstring:rabbit"]; if (connectionString == null) { var host = _configuration["service:rabbit:host"]; var port = int.Parse(_configuration["service:rabbit:port"]); endpoint = new AmqpTcpEndpoint(host, port); } else { endpoint = new AmqpTcpEndpoint(new Uri(connectionString)); } var factory = new ConnectionFactory() { Endpoint = endpoint, }; var connection = factory.CreateConnection(); return connection.CreateModel(); } catch (Exception ex) { if (i == 4) { edi = ExceptionDispatchInfo.Capture(ex); } _logger.LogError(0, ex, "Failed to start listening to rabbit mq"); } await Task.Delay(5000, cancellationToken); } edi!.Throw(); throw null; //unreachable } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }