committed by
GitHub
36 changed files with 988 additions and 276 deletions
@ -0,0 +1,114 @@ |
|||
using Elastic.Clients.Elasticsearch; |
|||
using Elastic.Clients.Elasticsearch.Core.Bulk; |
|||
using LINGYUN.Abp.Elasticsearch; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging.Elasticsearch; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class ElasticsearchAuditLogWriter : IAuditLogWriter, ITransientDependency |
|||
{ |
|||
private readonly IAuditLogInfoToAuditLogConverter _auditLogConverter; |
|||
private readonly IElasticsearchClientFactory _clientFactory; |
|||
private readonly IIndexNameNormalizer _indexNameNormalizer; |
|||
|
|||
private readonly ILogger<ElasticsearchAuditLogWriter> _logger; |
|||
|
|||
public ElasticsearchAuditLogWriter( |
|||
IAuditLogInfoToAuditLogConverter auditLogConverter, |
|||
IElasticsearchClientFactory clientFactory, |
|||
IIndexNameNormalizer indexNameNormalizer, |
|||
ILogger<ElasticsearchAuditLogWriter> logger) |
|||
{ |
|||
_auditLogConverter = auditLogConverter; |
|||
_clientFactory = clientFactory; |
|||
_indexNameNormalizer = indexNameNormalizer; |
|||
_logger = logger; |
|||
} |
|||
|
|||
public async virtual Task WriteAsync(AuditLogInfo auditLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
var client = _clientFactory.Create(); |
|||
var auditLog = await _auditLogConverter.ConvertAsync(auditLogInfo); |
|||
var response = await client.IndexAsync( |
|||
auditLog, |
|||
dsl => dsl.Index(CreateIndex()) |
|||
.Id(auditLog.Id), |
|||
cancellationToken); |
|||
|
|||
if (!response.IsValidResponse) |
|||
{ |
|||
_logger.LogWarning("Could not save the audit log object: " + Environment.NewLine + auditLog.ToString()); |
|||
if (response.TryGetOriginalException(out var ex)) |
|||
{ |
|||
_logger.LogWarning(ex, ex.Message); |
|||
} |
|||
else if (response.ElasticsearchServerError != null) |
|||
{ |
|||
_logger.LogWarning(response.ElasticsearchServerError.ToString()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public async virtual Task BulkWriteAsync(IEnumerable<AuditLogInfo> auditLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
if (!auditLogInfos.Any()) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var client = _clientFactory.Create(); |
|||
|
|||
var indexName = CreateIndex(); |
|||
|
|||
var bulkOperations = new List<BulkOperation>(); |
|||
foreach (var auditLogInfo in auditLogInfos) |
|||
{ |
|||
var auditLog = await _auditLogConverter.ConvertAsync(auditLogInfo); |
|||
bulkOperations.Add(new BulkCreateOperation<AuditLog>(auditLog) |
|||
{ |
|||
Id = auditLog.Id.ToString() |
|||
}); |
|||
} |
|||
|
|||
var bulkRequest = new BulkRequest |
|||
{ |
|||
Operations = [.. bulkOperations], |
|||
Index = indexName |
|||
}; |
|||
|
|||
var response = await client.BulkAsync(bulkRequest, cancellationToken); |
|||
|
|||
if (!response.IsValidResponse) |
|||
{ |
|||
await HandleBulkErrorsAsync(response, auditLogInfos); |
|||
} |
|||
} |
|||
|
|||
private Task HandleBulkErrorsAsync(BulkResponse response, IEnumerable<AuditLogInfo> auditLogInfos) |
|||
{ |
|||
foreach (var itemWithError in response.ItemsWithErrors) |
|||
{ |
|||
_logger.LogError($"Failed to write audit log: {itemWithError.Error?.Reason}"); |
|||
} |
|||
|
|||
foreach (var auditLog in auditLogInfos) |
|||
{ |
|||
_logger.LogInformation(auditLog.ToString()); |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
protected virtual string CreateIndex() |
|||
{ |
|||
return _indexNameNormalizer.NormalizeIndex("audit-log"); |
|||
} |
|||
} |
|||
@ -0,0 +1,122 @@ |
|||
using Elastic.Clients.Elasticsearch; |
|||
using Elastic.Clients.Elasticsearch.Core.Bulk; |
|||
using LINGYUN.Abp.Elasticsearch; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Guids; |
|||
using Volo.Abp.SecurityLog; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging.Elasticsearch; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class ElasticsearchSecurityLogWriter : ISecurityLogWriter, ITransientDependency |
|||
{ |
|||
private readonly IElasticsearchClientFactory _clientFactory; |
|||
private readonly IIndexNameNormalizer _indexNameNormalizer; |
|||
private readonly IGuidGenerator _guidGenerator; |
|||
|
|||
private readonly ILogger<ElasticsearchSecurityLogWriter> _logger; |
|||
|
|||
public ElasticsearchSecurityLogWriter( |
|||
IElasticsearchClientFactory clientFactory, |
|||
IIndexNameNormalizer indexNameNormalizer, |
|||
IGuidGenerator guidGenerator, |
|||
ILogger<ElasticsearchSecurityLogWriter> logger) |
|||
{ |
|||
_clientFactory = clientFactory; |
|||
_indexNameNormalizer = indexNameNormalizer; |
|||
_guidGenerator = guidGenerator; |
|||
_logger = logger; |
|||
} |
|||
|
|||
public async virtual Task WriteAsync(SecurityLogInfo securityLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
var client = _clientFactory.Create(); |
|||
|
|||
var securityLog = new SecurityLog( |
|||
_guidGenerator.Create(), |
|||
securityLogInfo); |
|||
|
|||
var response = await client.IndexAsync( |
|||
securityLog, |
|||
dsl => dsl.Index(CreateIndex()) |
|||
.Id(securityLog.Id), |
|||
cancellationToken); |
|||
|
|||
if (!response.IsValidResponse) |
|||
{ |
|||
_logger.LogWarning("Could not save the security log object: " + Environment.NewLine + securityLog.ToString()); |
|||
if (response.TryGetOriginalException(out var ex)) |
|||
{ |
|||
_logger.LogWarning(ex, ex.Message); |
|||
} |
|||
else if (response.ElasticsearchServerError != null) |
|||
{ |
|||
_logger.LogWarning(response.ElasticsearchServerError.ToString()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public async virtual Task BulkWriteAsync(IEnumerable<SecurityLogInfo> securityLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
if (!securityLogInfos.Any()) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var client = _clientFactory.Create(); |
|||
|
|||
var indexName = CreateIndex(); |
|||
|
|||
var bulkOperations = new List<BulkOperation>(); |
|||
foreach (var securityLogInfo in securityLogInfos) |
|||
{ |
|||
var securityLog = new SecurityLog( |
|||
_guidGenerator.Create(), |
|||
securityLogInfo); |
|||
|
|||
bulkOperations.Add(new BulkCreateOperation<SecurityLog>(securityLog) |
|||
{ |
|||
Id = securityLog.Id.ToString() |
|||
}); |
|||
} |
|||
|
|||
var bulkRequest = new BulkRequest |
|||
{ |
|||
Operations = [.. bulkOperations], |
|||
Index = indexName |
|||
}; |
|||
|
|||
var response = await client.BulkAsync(bulkRequest, cancellationToken); |
|||
|
|||
if (!response.IsValidResponse) |
|||
{ |
|||
await HandleBulkErrorsAsync(response, securityLogInfos); |
|||
} |
|||
} |
|||
|
|||
private Task HandleBulkErrorsAsync(BulkResponse response, IEnumerable<SecurityLogInfo> securityLogInfos) |
|||
{ |
|||
foreach (var itemWithError in response.ItemsWithErrors) |
|||
{ |
|||
_logger.LogError($"Failed to write security log: {itemWithError.Error?.Reason}"); |
|||
} |
|||
|
|||
foreach (var auditLog in securityLogInfos) |
|||
{ |
|||
_logger.LogInformation(auditLog.ToString()); |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
protected virtual string CreateIndex() |
|||
{ |
|||
return _indexNameNormalizer.NormalizeIndex("security-log"); |
|||
} |
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.AuditLogging; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Guids; |
|||
using Volo.Abp.Uow; |
|||
using IVoloAuditLogInfoToAuditLogConverter = Volo.Abp.AuditLogging.IAuditLogInfoToAuditLogConverter; |
|||
using VoloAuditLog = Volo.Abp.AuditLogging.AuditLog; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging.EntityFrameworkCore; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class EfCoreAuditLogWriter : IAuditLogWriter, ITransientDependency |
|||
{ |
|||
protected IVoloAuditLogInfoToAuditLogConverter AuditLogConverter { get; } |
|||
protected IAuditLogRepository AuditLogRepository { get; } |
|||
protected IUnitOfWorkManager UnitOfWorkManager { get; } |
|||
protected IGuidGenerator GuidGenerator { get; } |
|||
|
|||
public EfCoreAuditLogWriter( |
|||
IVoloAuditLogInfoToAuditLogConverter auditLogConverter, |
|||
IAuditLogRepository auditLogRepository, |
|||
IUnitOfWorkManager unitOfWorkManager, |
|||
IGuidGenerator guidGenerator) |
|||
{ |
|||
AuditLogConverter = auditLogConverter; |
|||
AuditLogRepository = auditLogRepository; |
|||
UnitOfWorkManager = unitOfWorkManager; |
|||
GuidGenerator = guidGenerator; |
|||
} |
|||
|
|||
public async virtual Task WriteAsync(AuditLogInfo auditLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
using (var uow = UnitOfWorkManager.Begin(true)) |
|||
{ |
|||
var auditLog = await AuditLogConverter.ConvertAsync(auditLogInfo); |
|||
|
|||
await AuditLogRepository.InsertAsync(auditLog); |
|||
|
|||
await uow.CompleteAsync(); |
|||
} |
|||
} |
|||
|
|||
public async virtual Task BulkWriteAsync(IEnumerable<AuditLogInfo> auditLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
using (var uow = UnitOfWorkManager.Begin(true)) |
|||
{ |
|||
var auditLogs = new List<VoloAuditLog>(); |
|||
foreach (var auditLogInfo in auditLogInfos) |
|||
{ |
|||
var auditLog = await AuditLogConverter.ConvertAsync(auditLogInfo); |
|||
auditLogs.Add(auditLog); |
|||
} |
|||
|
|||
await AuditLogRepository.InsertManyAsync(auditLogs); |
|||
|
|||
await uow.CompleteAsync(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,57 @@ |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.Guids; |
|||
using Volo.Abp.Identity; |
|||
using Volo.Abp.SecurityLog; |
|||
using Volo.Abp.Uow; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging.EntityFrameworkCore; |
|||
|
|||
[Dependency(ReplaceServices = true)] |
|||
public class EfCoreSecurityLogWriter : ISecurityLogWriter, ITransientDependency |
|||
{ |
|||
protected IIdentitySecurityLogRepository IdentitySecurityLogRepository { get; } |
|||
protected IUnitOfWorkManager UnitOfWorkManager { get; } |
|||
protected IGuidGenerator GuidGenerator { get; } |
|||
|
|||
public EfCoreSecurityLogWriter( |
|||
IIdentitySecurityLogRepository identitySecurityLogRepository, |
|||
IUnitOfWorkManager unitOfWorkManager, |
|||
IGuidGenerator guidGenerator) |
|||
{ |
|||
IdentitySecurityLogRepository = identitySecurityLogRepository; |
|||
UnitOfWorkManager = unitOfWorkManager; |
|||
GuidGenerator = guidGenerator; |
|||
} |
|||
|
|||
public async virtual Task BulkWriteAsync(IEnumerable<SecurityLogInfo> securityLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
using (var uow = UnitOfWorkManager.Begin(requiresNew: true)) |
|||
{ |
|||
var securityLogs = securityLogInfos.Select(securityLogInfo => |
|||
new IdentitySecurityLog(GuidGenerator, securityLogInfo)); |
|||
|
|||
await IdentitySecurityLogRepository.InsertManyAsync( |
|||
securityLogs, |
|||
false, |
|||
cancellationToken); |
|||
|
|||
await uow.CompleteAsync(); |
|||
} |
|||
} |
|||
|
|||
public async virtual Task WriteAsync(SecurityLogInfo securityLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
using (var uow = UnitOfWorkManager.Begin(requiresNew: true)) |
|||
{ |
|||
await IdentitySecurityLogRepository.InsertAsync( |
|||
new IdentitySecurityLog(GuidGenerator, securityLogInfo), |
|||
false, |
|||
cancellationToken); |
|||
await uow.CompleteAsync(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,54 @@ |
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public class AbpAuditLoggingOptions |
|||
{ |
|||
/// <summary>
|
|||
/// 是否启用审计日志记录
|
|||
/// </summary>
|
|||
public bool IsAuditLogEnabled { get; set; } |
|||
/// <summary>
|
|||
/// 使用审计日志队列
|
|||
/// </summary>
|
|||
/// <remarks>
|
|||
/// 不启用队列则直接写入到持久化设施
|
|||
/// </remarks>
|
|||
public bool UseAuditLogQueue { get; set; } |
|||
/// <summary>
|
|||
/// 审计日志最大队列大小
|
|||
/// </summary>
|
|||
public int MaxAuditLogQueueSize { get; set; } |
|||
/// <summary>
|
|||
/// 一次处理审计日志队列大小
|
|||
/// </summary>
|
|||
public int BatchAuditLogSize { get; set; } |
|||
/// <summary>
|
|||
/// 是否启用安全日志记录
|
|||
/// </summary>
|
|||
public bool IsSecurityLogEnabled { get; set; } |
|||
/// <summary>
|
|||
/// 使用安全日志队列
|
|||
/// </summary>
|
|||
/// <remarks>
|
|||
/// 不启用队列则直接写入到持久化设施
|
|||
/// </remarks>
|
|||
public bool UseSecurityLogQueue { get; set; } |
|||
/// <summary>
|
|||
/// 安全日志最大队列大小
|
|||
/// </summary>
|
|||
public int MaxSecurityLogQueueSize { get; set; } |
|||
/// <summary>
|
|||
/// 一次处理安全日志队列大小
|
|||
/// </summary>
|
|||
public int BatchSecurityLogSize { get; set; } |
|||
public AbpAuditLoggingOptions() |
|||
{ |
|||
IsAuditLogEnabled = true; |
|||
UseAuditLogQueue = true; |
|||
BatchAuditLogSize = 100; |
|||
MaxAuditLogQueueSize = 10000; |
|||
|
|||
IsSecurityLogEnabled = true; |
|||
UseSecurityLogQueue = true; |
|||
BatchSecurityLogSize = 100; |
|||
MaxSecurityLogQueueSize = 10000; |
|||
} |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Options; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.MultiTenancy; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public class AuditLogQueue : AuditLoggingQueue<AuditLogInfo>, IAuditLogQueue, ISingletonDependency |
|||
{ |
|||
private readonly IAuditLogWriter _auditLogWriter; |
|||
private readonly ICurrentTenant _currentTenant; |
|||
public AuditLogQueue( |
|||
IOptions<AbpAuditLoggingOptions> options, |
|||
IAuditLogWriter auditLogWriter, |
|||
ICurrentTenant currentTenant, |
|||
ILogger<AuditLogQueue> logger) |
|||
: base( |
|||
"AuditLog", |
|||
options.Value.MaxAuditLogQueueSize, |
|||
options.Value.BatchAuditLogSize, |
|||
logger) |
|||
{ |
|||
_auditLogWriter = auditLogWriter; |
|||
_currentTenant = currentTenant; |
|||
} |
|||
|
|||
protected async override Task BulkWriteAsync(IEnumerable<AuditLogInfo> auditLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
var tenantAuditlogGroup = auditLogInfos.GroupBy(x => x.ImpersonatorTenantId ?? x.TenantId); |
|||
foreach (var tenantAuditlogs in tenantAuditlogGroup) |
|||
{ |
|||
using (_currentTenant.Change(tenantAuditlogs.Key)) |
|||
{ |
|||
await _auditLogWriter.BulkWriteAsync(tenantAuditlogs, cancellationToken); |
|||
} |
|||
} |
|||
} |
|||
|
|||
protected async override Task WriteAsync(AuditLogInfo auditLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
using (_currentTenant.Change(auditLogInfo.ImpersonatorTenantId ?? auditLogInfo.TenantId)) |
|||
{ |
|||
await _auditLogWriter.WriteAsync(auditLogInfo, cancellationToken); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,196 @@ |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Channels; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public abstract class AuditLoggingQueue<TLog> |
|||
{ |
|||
private readonly ILogger<AuditLoggingQueue<TLog>> _logger; |
|||
private readonly Channel<TLog> _channel; |
|||
private readonly string _logName; |
|||
private readonly int _batchSize; |
|||
private readonly int _maxConcurrency; |
|||
|
|||
private volatile Task _consumerTask; |
|||
private readonly SemaphoreSlim _flushSemaphore; |
|||
private readonly CancellationTokenSource _cts = new CancellationTokenSource(); |
|||
protected AuditLoggingQueue( |
|||
string logName, |
|||
int maxQueueSize, |
|||
int batchSize, |
|||
ILogger<AuditLoggingQueue<TLog>> logger) |
|||
{ |
|||
_logName = logName; |
|||
_batchSize = batchSize; |
|||
_logger = logger; |
|||
|
|||
var channelOptions = new BoundedChannelOptions(maxQueueSize) |
|||
{ |
|||
FullMode = BoundedChannelFullMode.Wait, |
|||
SingleWriter = false, |
|||
SingleReader = true, |
|||
}; |
|||
_channel = Channel.CreateBounded<TLog>(channelOptions); |
|||
|
|||
var calculatedConcurrency = (int)Math.Ceiling(Math.Max(1, maxQueueSize) / (double)Math.Max(1, batchSize)); |
|||
var defaultConcurrency = Environment.ProcessorCount * 2; |
|||
_maxConcurrency = Math.Min(calculatedConcurrency, defaultConcurrency); |
|||
_flushSemaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency); |
|||
} |
|||
|
|||
public virtual Task EnqueueAsync(TLog log, CancellationToken cancellationToken = default) |
|||
{ |
|||
try |
|||
{ |
|||
if (!_channel.Writer.TryWrite(log)) |
|||
{ |
|||
_logger.LogWarning("{logName} channel is full; {logName} is recorded in the log.", _logName, _logName); |
|||
_logger.LogInformation(log!.ToString()); |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
catch (ChannelClosedException) |
|||
{ |
|||
_logger.LogWarning("{logName} channel is closed; dropping it.", _logName); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
_logger.LogDebug("EnqueueAsync was canceled while waiting to write to {logName} channel.", _logName); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "Unexpected exception while enqueuing {logName}.", _logName); |
|||
} |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
public virtual Task StartAsync(CancellationToken cancellationToken) |
|||
{ |
|||
_consumerTask = Task.Factory.StartNew( |
|||
() => ConsumeBatchesAsync(), |
|||
CancellationToken.None, |
|||
TaskCreationOptions.LongRunning, |
|||
TaskScheduler.Default) |
|||
.Unwrap(); |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
|
|||
public async virtual Task StopAsync(CancellationToken cancellationToken) |
|||
{ |
|||
_channel.Writer.TryComplete(); |
|||
|
|||
if (_consumerTask != null) |
|||
{ |
|||
try |
|||
{ |
|||
await Task.WhenAny(_consumerTask, Task.Delay(TimeSpan.FromSeconds(5), cancellationToken)); |
|||
_cts.Cancel(); |
|||
_logger.LogInformation("Stopped consumer loop: {logName}.", _logName); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
_logger.LogWarning("StopAsync was canceled while waiting for consumer loop: {logName} to finish.", _logName); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "Exception while stopping consumer loop: {logName}.", _logName); |
|||
} |
|||
finally |
|||
{ |
|||
_cts.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private async Task ConsumeBatchesAsync() |
|||
{ |
|||
var reader = _channel.Reader; |
|||
var batchSize = Math.Max(1, _batchSize); |
|||
var batchSendTaskList = new List<Task>(_maxConcurrency * 2); |
|||
|
|||
_logger.LogInformation("Starting {logName} consumer loop. BatchSize={batchSize}", _logName, batchSize); |
|||
|
|||
try |
|||
{ |
|||
while (await reader.WaitToReadAsync(_cts.Token)) |
|||
{ |
|||
var buffer = new List<TLog>(batchSize); |
|||
|
|||
while (buffer.Count < batchSize && reader.TryRead(out var item)) |
|||
{ |
|||
buffer.Add(item); |
|||
} |
|||
|
|||
if (buffer.Count > 0) |
|||
{ |
|||
batchSendTaskList.Add(SendBatchAsync(buffer, _cts.Token)); |
|||
if (batchSendTaskList.Count >= _maxConcurrency) |
|||
{ |
|||
try |
|||
{ |
|||
var completedTask = await Task.WhenAny(batchSendTaskList); |
|||
batchSendTaskList.Remove(completedTask); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "Background send {logName} task failed.", _logName); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
await Task.WhenAll(batchSendTaskList); |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
// ignore
|
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "Unexpected exception in {logName} consumer loop.", _logName); |
|||
} |
|||
} |
|||
|
|||
private async Task SendBatchAsync(List<TLog> batch, CancellationToken cancellationToken) |
|||
{ |
|||
if (batch == null || batch.Count == 0) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
await _flushSemaphore.WaitAsync(cancellationToken); |
|||
|
|||
try |
|||
{ |
|||
if (batch.Count == 1) |
|||
{ |
|||
await WriteAsync(batch[0], cancellationToken); |
|||
} |
|||
else |
|||
{ |
|||
await BulkWriteAsync(batch, cancellationToken); |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
_logger.LogDebug("SendBatchAsync canceled while writing {logName}.", _logName); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "Failed to write {logName} batch to Elasticsearch. Items={count}", _logName, batch.Count); |
|||
} |
|||
finally |
|||
{ |
|||
_flushSemaphore.Release(); |
|||
} |
|||
} |
|||
|
|||
protected abstract Task WriteAsync(TLog log, CancellationToken cancellationToken = default); |
|||
protected abstract Task BulkWriteAsync(IEnumerable<TLog> logs, CancellationToken cancellationToken = default); |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.Auditing; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public interface IAuditLogQueue |
|||
{ |
|||
Task EnqueueAsync(AuditLogInfo auditLogInfo, CancellationToken cancellationToken = default); |
|||
} |
|||
@ -0,0 +1,12 @@ |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.Auditing; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public interface IAuditLogWriter |
|||
{ |
|||
Task WriteAsync(AuditLogInfo auditLogInfo, CancellationToken cancellationToken = default); |
|||
|
|||
Task BulkWriteAsync(IEnumerable<AuditLogInfo> auditLogInfos, CancellationToken cancellationToken = default); |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.SecurityLog; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public interface ISecurityLogQueue |
|||
{ |
|||
Task EnqueueAsync(SecurityLogInfo securityLogInfo, CancellationToken cancellationToken = default); |
|||
} |
|||
@ -0,0 +1,12 @@ |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.SecurityLog; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public interface ISecurityLogWriter |
|||
{ |
|||
Task WriteAsync(SecurityLogInfo securityLogInfo, CancellationToken cancellationToken = default); |
|||
|
|||
Task BulkWriteAsync(IEnumerable<SecurityLogInfo> securityLogInfos, CancellationToken cancellationToken = default); |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
using Microsoft.Extensions.Logging; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.Auditing; |
|||
using Volo.Abp.DependencyInjection; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
|
|||
[Dependency(TryRegister = true)] |
|||
public class LoggerAuditLogWriter : IAuditLogWriter, ISingletonDependency |
|||
{ |
|||
private readonly ILogger<LoggerAuditLogWriter> _logger; |
|||
public LoggerAuditLogWriter(ILogger<LoggerAuditLogWriter> logger) |
|||
{ |
|||
_logger = logger; |
|||
} |
|||
|
|||
public async virtual Task BulkWriteAsync(IEnumerable<AuditLogInfo> auditLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
foreach (var auditLogInfo in auditLogInfos) |
|||
{ |
|||
await WriteAsync(auditLogInfo, cancellationToken); |
|||
} |
|||
} |
|||
|
|||
public virtual Task WriteAsync(AuditLogInfo auditLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
_logger.LogInformation(auditLogInfo.ToString()); |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
using Microsoft.Extensions.Logging; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.SecurityLog; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
|
|||
[Dependency(TryRegister = true)] |
|||
public class LoggerSecurityLogWriter : ISecurityLogWriter, ISingletonDependency |
|||
{ |
|||
private readonly ILogger<LoggerSecurityLogWriter> _logger; |
|||
public LoggerSecurityLogWriter(ILogger<LoggerSecurityLogWriter> logger) |
|||
{ |
|||
_logger = logger; |
|||
} |
|||
|
|||
public async virtual Task BulkWriteAsync(IEnumerable<SecurityLogInfo> securityLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
foreach (var securityLogInfo in securityLogInfos) |
|||
{ |
|||
await WriteAsync(securityLogInfo, cancellationToken); |
|||
} |
|||
} |
|||
|
|||
public virtual Task WriteAsync(SecurityLogInfo securityLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
_logger.LogInformation(securityLogInfo.ToString()); |
|||
|
|||
return Task.CompletedTask; |
|||
} |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
using Microsoft.Extensions.Logging; |
|||
using Microsoft.Extensions.Options; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using Volo.Abp.DependencyInjection; |
|||
using Volo.Abp.MultiTenancy; |
|||
using Volo.Abp.SecurityLog; |
|||
|
|||
namespace LINGYUN.Abp.AuditLogging; |
|||
public class SecurityLogQueue : AuditLoggingQueue<SecurityLogInfo>, ISecurityLogQueue, ISingletonDependency |
|||
{ |
|||
private readonly ISecurityLogWriter _securityLogWriter; |
|||
private readonly ICurrentTenant _currentTenant; |
|||
public SecurityLogQueue( |
|||
IOptions<AbpAuditLoggingOptions> options, |
|||
ISecurityLogWriter securityLogWriter, |
|||
ICurrentTenant currentTenant, |
|||
ILogger<SecurityLogQueue> logger) |
|||
: base( |
|||
"SecurityLog", |
|||
options.Value.MaxSecurityLogQueueSize, |
|||
options.Value.BatchSecurityLogSize, |
|||
logger) |
|||
{ |
|||
_securityLogWriter = securityLogWriter; |
|||
_currentTenant = currentTenant; |
|||
} |
|||
|
|||
protected async override Task BulkWriteAsync(IEnumerable<SecurityLogInfo> securityLogInfos, CancellationToken cancellationToken = default) |
|||
{ |
|||
var tenantSecurityLogGroup = securityLogInfos.GroupBy(x => x.TenantId); |
|||
foreach (var tenantSecurityLogs in tenantSecurityLogGroup) |
|||
{ |
|||
using (_currentTenant.Change(tenantSecurityLogs.Key)) |
|||
{ |
|||
await _securityLogWriter.BulkWriteAsync(tenantSecurityLogs, cancellationToken); |
|||
} |
|||
} |
|||
} |
|||
|
|||
protected async override Task WriteAsync(SecurityLogInfo securityLogInfo, CancellationToken cancellationToken = default) |
|||
{ |
|||
using (_currentTenant.Change(securityLogInfo.TenantId)) |
|||
{ |
|||
await _securityLogWriter.WriteAsync(securityLogInfo, cancellationToken); |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue