Browse Source

Improve performance

pull/10159/head
liangshiwei 4 years ago
parent
commit
f0f61884b7
  1. 26
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs
  2. 10
      framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs
  3. 28
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs
  4. 6
      framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

26
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventInbox.cs

@ -57,35 +57,31 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
}
[UnitOfWork]
public async Task MarkAsProcessedAsync(Guid id)
public virtual async Task MarkAsProcessedAsync(Guid id)
{
//TODO: Optimize?
var dbContext = await DbContextProvider.GetDbContextAsync();
var incomingEvent = await dbContext.IncomingEvents.FindAsync(id);
if (incomingEvent != null)
{
incomingEvent.MarkAsProcessed(Clock.Now);
}
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"UPDATE {tableName} SET Processed = 1, ProcessedTime = '{Clock.Now}' WHERE Id = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
[UnitOfWork]
public async Task<bool> ExistsByMessageIdAsync(string messageId)
public virtual async Task<bool> ExistsByMessageIdAsync(string messageId)
{
//TODO: Optimize
var dbContext = await DbContextProvider.GetDbContextAsync();
return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId);
}
[UnitOfWork]
public async Task DeleteOldEventsAsync()
public virtual async Task DeleteOldEventsAsync()
{
//TODO: Optimize
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);
var oldEvents = await dbContext.IncomingEvents
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
.ToListAsync();
dbContext.IncomingEvents.RemoveRange(oldEvents);
var sql = $"DELETE FROM {tableName} WHERE Processed = 1 AND CreationTime < '{timeToKeepEvents}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

10
framework/src/Volo.Abp.EntityFrameworkCore/Volo/Abp/EntityFrameworkCore/DistributedEvents/DbContextEventOutbox.cs

@ -49,13 +49,11 @@ namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
[UnitOfWork]
public virtual async Task DeleteAsync(Guid id)
{
//TODO: Optimize?
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id);
if (outgoingEvent != null)
{
dbContext.Remove(outgoingEvent);
}
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();
var sql = $"DELETE FROM {tableName} WHERE Id = '{id}'";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}
}
}

28
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventInbox.cs

@ -68,34 +68,32 @@ namespace Volo.Abp.MongoDB.DistributedEvents
}
[UnitOfWork]
public async Task MarkAsProcessedAsync(Guid id)
public virtual async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var incomingEvent = await dbContext.IncomingEvents.Find(x => x.Id.Equals(id)).FirstOrDefaultAsync();
if (incomingEvent != null)
{
incomingEvent.MarkAsProcessed(Clock.Now);
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.ReplaceOneAsync(dbContext.SessionHandle, Builders<IncomingEventRecord>.Filter.Eq(e => e.Id, incomingEvent.Id), incomingEvent);
}
else
{
await dbContext.IncomingEvents.ReplaceOneAsync(Builders<IncomingEventRecord>.Filter.Eq(e => e.Id, incomingEvent.Id), incomingEvent);
}
var filter = Builders<IncomingEventRecord>.Filter.Eq(x => x.Id, id);
var update = Builders<IncomingEventRecord>.Update.Set(x => x.Processed, true).Set(x => x.ProcessedTime, Clock.Now);
if (dbContext.SessionHandle != null)
{
await dbContext.IncomingEvents.UpdateOneAsync(dbContext.SessionHandle, filter, update);
}
else
{
await dbContext.IncomingEvents.UpdateOneAsync(filter, update);
}
}
[UnitOfWork]
public async Task<bool> ExistsByMessageIdAsync(string messageId)
public virtual async Task<bool> ExistsByMessageIdAsync(string messageId)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
return await dbContext.IncomingEvents.AsQueryable().AnyAsync(x => x.MessageId == messageId);
}
[UnitOfWork]
public async Task DeleteOldEventsAsync()
public virtual async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now.Add(DistributedEventsOptions.InboxKeepEventTimeSpan);

6
framework/src/Volo.Abp.MongoDB/Volo/Abp/MongoDB/DistributedEvents/MongoDbContextEventOutbox.cs

@ -21,7 +21,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents
}
[UnitOfWork]
public async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
{
var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync();
if (dbContext.SessionHandle != null)
@ -40,7 +40,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents
}
[UnitOfWork]
public async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync(cancellationToken);
@ -56,7 +56,7 @@ namespace Volo.Abp.MongoDB.DistributedEvents
}
[UnitOfWork]
public async Task DeleteAsync(Guid id)
public virtual async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await MongoDbContextProvider.GetDbContextAsync();
if (dbContext.SessionHandle != null)

Loading…
Cancel
Save