|
|
@ -40,29 +40,37 @@ public class ElasticsearchEntityChangeStore : IEntityChangeStore, ITransientDepe |
|
|
{ |
|
|
{ |
|
|
var client = _clientFactory.Create(); |
|
|
var client = _clientFactory.Create(); |
|
|
|
|
|
|
|
|
var resposne = await client.SearchAsync<EntityChange>( |
|
|
var sortOrder = SortOrder.Descending; |
|
|
dsl => dsl.Index(CreateIndex()) |
|
|
|
|
|
.Query(query => |
|
|
var querys = BuildQueryDescriptor(entityChangeId: entityChangeId); |
|
|
query.Bool(bo => |
|
|
|
|
|
bo.Must(m => |
|
|
static SourceFilterDescriptor<AuditLog> SourceFilter(SourceFilterDescriptor<AuditLog> selector) |
|
|
m.Nested(n => |
|
|
|
|
|
n.InnerHits() |
|
|
|
|
|
.Path("EntityChanges") |
|
|
|
|
|
.Query(nq => |
|
|
|
|
|
nq.Term(nqt => |
|
|
|
|
|
nqt.Field(GetField(nameof(EntityChange.Id))).Value(entityChangeId))))))) |
|
|
|
|
|
.Source(x => x.Excludes(f => f.Field("*"))) |
|
|
|
|
|
.Sort(entity => entity.Field("EntityChanges.ChangeTime", SortOrder.Descending)) |
|
|
|
|
|
.Size(1), |
|
|
|
|
|
ct: cancellationToken); |
|
|
|
|
|
|
|
|
|
|
|
if (resposne.Shards.Successful > 0) |
|
|
|
|
|
{ |
|
|
{ |
|
|
var hits = resposne.Hits.FirstOrDefault(); |
|
|
selector.IncludeAll() |
|
|
if (hits.InnerHits.Count > 0) |
|
|
.Excludes(field => |
|
|
{ |
|
|
field.Field(f => f.Actions) |
|
|
return hits.InnerHits.First().Value.Documents<EntityChange>().FirstOrDefault(); |
|
|
.Field(f => f.Comments) |
|
|
} |
|
|
.Field(f => f.Exceptions)); |
|
|
|
|
|
|
|
|
|
|
|
return selector; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var response = await client.SearchAsync<AuditLog>(dsl => |
|
|
|
|
|
dsl.Index(CreateIndex()) |
|
|
|
|
|
.Query(log => log.Bool(b => b.Must(querys.ToArray()))) |
|
|
|
|
|
.Source(SourceFilter) |
|
|
|
|
|
.Sort(log => log.Field(GetField(nameof(EntityChange.ChangeTime)), sortOrder)) |
|
|
|
|
|
.From(0) |
|
|
|
|
|
.Size(1), |
|
|
|
|
|
cancellationToken); |
|
|
|
|
|
|
|
|
|
|
|
var auditLog = response.Documents.FirstOrDefault(); |
|
|
|
|
|
if (auditLog != null) |
|
|
|
|
|
{ |
|
|
|
|
|
return auditLog |
|
|
|
|
|
.EntityChanges |
|
|
|
|
|
.Select(e => e) |
|
|
|
|
|
.FirstOrDefault(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return null; |
|
|
return null; |
|
|
@ -77,40 +85,39 @@ public class ElasticsearchEntityChangeStore : IEntityChangeStore, ITransientDepe |
|
|
string entityTypeFullName = null, |
|
|
string entityTypeFullName = null, |
|
|
CancellationToken cancellationToken = default) |
|
|
CancellationToken cancellationToken = default) |
|
|
{ |
|
|
{ |
|
|
await Task.CompletedTask; |
|
|
var client = _clientFactory.Create(); |
|
|
return 0; |
|
|
|
|
|
//var client = _clientFactory.Create();
|
|
|
var querys = BuildQueryDescriptor( |
|
|
|
|
|
auditLogId, |
|
|
//var querys = BuildQueryDescriptor(
|
|
|
startTime, |
|
|
// auditLogId,
|
|
|
endTime, |
|
|
// startTime,
|
|
|
changeType, |
|
|
// endTime,
|
|
|
entityId, |
|
|
// changeType,
|
|
|
entityTypeFullName); |
|
|
// entityId,
|
|
|
|
|
|
// entityTypeFullName);
|
|
|
SourceFilterDescriptor<AuditLog> SourceFilter(SourceFilterDescriptor<AuditLog> selector) |
|
|
|
|
|
{ |
|
|
//Func<QueryContainerDescriptor<EntityChange>, QueryContainer> selector = q => q.MatchAll();
|
|
|
return selector |
|
|
//if (querys.Count > 0)
|
|
|
.Includes(field => |
|
|
//{
|
|
|
field.Field(f => f.UserName) |
|
|
// selector = q => q.Bool(b => b.Must(querys.ToArray()));
|
|
|
.Field(f => f.EntityChanges)) |
|
|
//}
|
|
|
.Excludes(field => |
|
|
|
|
|
field.Field(f => f.Actions) |
|
|
//var response = await client.CountAsync<EntityChange>(dsl =>
|
|
|
.Field(f => f.Comments) |
|
|
// dsl.Index(CreateIndex())
|
|
|
.Field(f => f.Exceptions)); |
|
|
// .Query(q =>
|
|
|
} |
|
|
// q.Bool(b =>
|
|
|
|
|
|
// b.Must(m =>
|
|
|
var response = await client.SearchAsync<AuditLog>(dsl => |
|
|
// m.Nested(n =>
|
|
|
dsl.Index(CreateIndex()) |
|
|
// n.InnerHits(hit => hit.Source(s => s.ExcludeAll()))
|
|
|
.Query(log => log.Bool(b => b.Must(querys.ToArray()))) |
|
|
// .Path("EntityChanges")
|
|
|
.Source(SourceFilter) |
|
|
// .Query(selector)
|
|
|
.From(0) |
|
|
// )
|
|
|
.Size(1000), |
|
|
// )
|
|
|
cancellationToken); |
|
|
// )
|
|
|
|
|
|
// ),
|
|
|
var auditLogs = response.Documents.ToList(); |
|
|
// ct: cancellationToken);
|
|
|
|
|
|
|
|
|
return auditLogs.Sum(log => log.EntityChanges.Count); |
|
|
//return response.Count;
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async virtual Task<List<EntityChange>> GetListAsync( |
|
|
public async virtual Task<List<EntityChange>> GetListAsync( |
|
|
@ -126,72 +133,74 @@ public class ElasticsearchEntityChangeStore : IEntityChangeStore, ITransientDepe |
|
|
bool includeDetails = false, |
|
|
bool includeDetails = false, |
|
|
CancellationToken cancellationToken = default) |
|
|
CancellationToken cancellationToken = default) |
|
|
{ |
|
|
{ |
|
|
// TODO: 需要解决Nested格式数据返回方式
|
|
|
// TODO: 正确的索引可以避免性能损耗
|
|
|
|
|
|
|
|
|
//var client = _clientFactory.Create();
|
|
|
var result = new List<EntityChange>(); |
|
|
|
|
|
var client = _clientFactory.Create(); |
|
|
//var sortOrder = !sorting.IsNullOrWhiteSpace() && sorting.EndsWith("asc", StringComparison.InvariantCultureIgnoreCase)
|
|
|
|
|
|
// ? SortOrder.Ascending : SortOrder.Descending;
|
|
|
var sortOrder = !sorting.IsNullOrWhiteSpace() && sorting.EndsWith("asc", StringComparison.InvariantCultureIgnoreCase) |
|
|
//sorting = !sorting.IsNullOrWhiteSpace()
|
|
|
? SortOrder.Ascending : SortOrder.Descending; |
|
|
// ? sorting.Split()[0]
|
|
|
sorting = !sorting.IsNullOrWhiteSpace() |
|
|
// : nameof(EntityChange.ChangeTime);
|
|
|
? sorting.Split()[0] |
|
|
|
|
|
: nameof(EntityChange.ChangeTime); |
|
|
//var querys = BuildQueryDescriptor(
|
|
|
|
|
|
// auditLogId,
|
|
|
var querys = BuildQueryDescriptor( |
|
|
// startTime,
|
|
|
auditLogId, |
|
|
// endTime,
|
|
|
startTime, |
|
|
// changeType,
|
|
|
endTime, |
|
|
// entityId,
|
|
|
changeType, |
|
|
// entityTypeFullName);
|
|
|
entityId, |
|
|
|
|
|
entityTypeFullName); |
|
|
//SourceFilterDescriptor<EntityChange> SourceFilter(SourceFilterDescriptor<EntityChange> selector)
|
|
|
|
|
|
//{
|
|
|
SourceFilterDescriptor<AuditLog> SourceFilter(SourceFilterDescriptor<AuditLog> selector) |
|
|
// selector.Includes(GetEntityChangeSources());
|
|
|
{ |
|
|
// if (!includeDetails)
|
|
|
selector |
|
|
// {
|
|
|
.Includes(field => |
|
|
// selector.Excludes(field =>
|
|
|
field.Field(f => f.UserName) |
|
|
// field.Field("EntityChanges.PropertyChanges")
|
|
|
.Field(f => f.EntityChanges)); |
|
|
// .Field("EntityChanges.ExtraProperties"));
|
|
|
if (includeDetails) |
|
|
// }
|
|
|
{ |
|
|
|
|
|
selector.Includes(field => |
|
|
// return selector;
|
|
|
field.Field(f => f.Actions) |
|
|
//}
|
|
|
.Field(f => f.Comments) |
|
|
|
|
|
.Field(f => f.Exceptions)); |
|
|
//Func<QueryContainerDescriptor<EntityChange>, QueryContainer> selector = q => q.MatchAll();
|
|
|
} |
|
|
//if (querys.Count > 0)
|
|
|
|
|
|
//{
|
|
|
return selector; |
|
|
// selector = q => q.Bool(b => b.Must(querys.ToArray()));
|
|
|
} |
|
|
//}
|
|
|
|
|
|
|
|
|
var response = await client.SearchAsync<AuditLog>(dsl => |
|
|
//var response = await client.SearchAsync<EntityChange>(dsl =>
|
|
|
dsl.Index(CreateIndex()) |
|
|
// dsl.Index(CreateIndex())
|
|
|
.Query(log => log.Bool(b => b.Must(querys.ToArray()))) |
|
|
// .Query(q =>
|
|
|
.Source(SourceFilter) |
|
|
// q.Bool(b =>
|
|
|
.Sort(log => log.Field(GetField(nameof(EntityChange.ChangeTime)), sortOrder)) |
|
|
// b.Must(m =>
|
|
|
.From(0) |
|
|
// m.Nested(n =>
|
|
|
.Size(1000), |
|
|
// n.InnerHits(hit => hit.Source(SourceFilter))
|
|
|
cancellationToken); |
|
|
// .Path("EntityChanges")
|
|
|
|
|
|
// .Query(selector)
|
|
|
var auditLogs = response.Documents.ToList(); |
|
|
// )
|
|
|
if (auditLogs.Any()) |
|
|
// )
|
|
|
{ |
|
|
// )
|
|
|
var groupAuditLogs = auditLogs.GroupBy(log => log.UserName); |
|
|
// )
|
|
|
foreach (var group in groupAuditLogs) |
|
|
// .Source(x => x.Excludes(f => f.Field("*")))
|
|
|
{ |
|
|
// .Sort(entity => entity.Field(GetField(sorting), sortOrder))
|
|
|
var entityChangesList = group.Select(log => log.EntityChanges); |
|
|
// .From(skipCount)
|
|
|
|
|
|
// .Size(maxResultCount),
|
|
|
foreach (var entityChanges in entityChangesList) |
|
|
// cancellationToken);
|
|
|
{ |
|
|
|
|
|
foreach (var entityChange in entityChanges) |
|
|
//if (response.Shards.Successful > 0)
|
|
|
{ |
|
|
//{
|
|
|
result.Add(entityChange); |
|
|
// var hits = response.Hits.FirstOrDefault();
|
|
|
} |
|
|
// if (hits.InnerHits.Count > 0)
|
|
|
} |
|
|
// {
|
|
|
} |
|
|
// return hits.InnerHits.First().Value.Documents<EntityChange>().ToList();
|
|
|
} |
|
|
// }
|
|
|
|
|
|
//}
|
|
|
// TODO: 临时在内存中分页
|
|
|
await Task.CompletedTask; |
|
|
return result |
|
|
return new List<EntityChange>(); |
|
|
.AsQueryable() |
|
|
|
|
|
.PageBy(skipCount, maxResultCount) |
|
|
|
|
|
.ToList(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async virtual Task<EntityChangeWithUsername> GetWithUsernameAsync( |
|
|
public async virtual Task<EntityChangeWithUsername> GetWithUsernameAsync( |
|
|
@ -200,41 +209,43 @@ public class ElasticsearchEntityChangeStore : IEntityChangeStore, ITransientDepe |
|
|
{ |
|
|
{ |
|
|
var client = _clientFactory.Create(); |
|
|
var client = _clientFactory.Create(); |
|
|
|
|
|
|
|
|
var response = await client.SearchAsync<AuditLog>( |
|
|
var sortOrder = SortOrder.Descending; |
|
|
dsl => dsl.Index(CreateIndex()) |
|
|
|
|
|
.Query(query => |
|
|
|
|
|
query.Bool(bo => |
|
|
|
|
|
bo.Must(m => |
|
|
|
|
|
m.Nested(n => |
|
|
|
|
|
n.InnerHits() |
|
|
|
|
|
.Path("EntityChanges") |
|
|
|
|
|
.Query(nq => |
|
|
|
|
|
nq.Bool(nb => |
|
|
|
|
|
nb.Must(nm => |
|
|
|
|
|
nm.Term(nt => |
|
|
|
|
|
nt.Field(GetField(nameof(EntityChange.Id))).Value(entityChangeId))))))))) |
|
|
|
|
|
.Source(selector => selector.Includes(field => |
|
|
|
|
|
field.Field(f => f.UserName))) |
|
|
|
|
|
.Size(1), |
|
|
|
|
|
ct: cancellationToken); |
|
|
|
|
|
|
|
|
|
|
|
var auditLog = response.Documents.FirstOrDefault(); |
|
|
var querys = BuildQueryDescriptor(entityChangeId: entityChangeId); |
|
|
EntityChange entityChange = null; |
|
|
|
|
|
|
|
|
|
|
|
if (response.Shards.Successful > 0) |
|
|
static SourceFilterDescriptor<AuditLog> SourceFilter(SourceFilterDescriptor<AuditLog> selector) |
|
|
{ |
|
|
{ |
|
|
var hits = response.Hits.FirstOrDefault(); |
|
|
selector.IncludeAll() |
|
|
if (hits.InnerHits.Count > 0) |
|
|
.Excludes(field => |
|
|
{ |
|
|
field.Field(f => f.Actions) |
|
|
entityChange = hits.InnerHits.First().Value.Documents<EntityChange>().FirstOrDefault(); |
|
|
.Field(f => f.Comments) |
|
|
} |
|
|
.Field(f => f.Exceptions)); |
|
|
|
|
|
|
|
|
|
|
|
return selector; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return new EntityChangeWithUsername() |
|
|
var response = await client.SearchAsync<AuditLog>(dsl => |
|
|
|
|
|
dsl.Index(CreateIndex()) |
|
|
|
|
|
.Query(log => log.Bool(b => b.Must(querys.ToArray()))) |
|
|
|
|
|
.Source(SourceFilter) |
|
|
|
|
|
.Sort(log => log.Field(GetField(nameof(EntityChange.ChangeTime)), sortOrder)) |
|
|
|
|
|
.From(0) |
|
|
|
|
|
.Size(1), |
|
|
|
|
|
cancellationToken); |
|
|
|
|
|
|
|
|
|
|
|
var auditLog = response.Documents.FirstOrDefault(); |
|
|
|
|
|
if (auditLog != null) |
|
|
{ |
|
|
{ |
|
|
EntityChange = entityChange, |
|
|
return auditLog.EntityChanges.Select(e => |
|
|
UserName = auditLog?.UserName |
|
|
new EntityChangeWithUsername |
|
|
}; |
|
|
{ |
|
|
|
|
|
UserName = auditLog.UserName, |
|
|
|
|
|
EntityChange = e |
|
|
|
|
|
}) |
|
|
|
|
|
.FirstOrDefault(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return null; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async virtual Task<List<EntityChangeWithUsername>> GetWithUsernameAsync( |
|
|
public async virtual Task<List<EntityChangeWithUsername>> GetWithUsernameAsync( |
|
|
@ -242,61 +253,69 @@ public class ElasticsearchEntityChangeStore : IEntityChangeStore, ITransientDepe |
|
|
string entityTypeFullName, |
|
|
string entityTypeFullName, |
|
|
CancellationToken cancellationToken = default) |
|
|
CancellationToken cancellationToken = default) |
|
|
{ |
|
|
{ |
|
|
|
|
|
var result = new List<EntityChangeWithUsername>(); |
|
|
var client = _clientFactory.Create(); |
|
|
var client = _clientFactory.Create(); |
|
|
|
|
|
|
|
|
var response = await client.SearchAsync<AuditLog>( |
|
|
var sortOrder = SortOrder.Descending; |
|
|
dsl => dsl.Index(CreateIndex()) |
|
|
|
|
|
.Query(query => |
|
|
var querys = BuildQueryDescriptor(entityId: entityId, entityTypeFullName: entityTypeFullName); |
|
|
query.Bool(bo => |
|
|
|
|
|
bo.Must(m => |
|
|
static SourceFilterDescriptor<AuditLog> SourceFilter(SourceFilterDescriptor<AuditLog> selector) |
|
|
m.Nested(n => |
|
|
{ |
|
|
n.InnerHits() |
|
|
selector.IncludeAll() |
|
|
.Path("EntityChanges") |
|
|
.Excludes(field => |
|
|
.Query(nq => |
|
|
field.Field(f => f.Actions) |
|
|
nq.Bool(nb => |
|
|
.Field(f => f.Comments) |
|
|
nb.Must(nm => |
|
|
.Field(f => f.Exceptions)); |
|
|
nm.Term(nt => |
|
|
|
|
|
nt.Field(GetField(nameof(EntityChange.EntityId))).Value(entityId)), |
|
|
return selector; |
|
|
nm => |
|
|
} |
|
|
nm.Term(nt => |
|
|
|
|
|
nt.Field(GetField(nameof(EntityChange.EntityTypeFullName))).Value(entityTypeFullName)) |
|
|
var response = await client.SearchAsync<AuditLog>(dsl => |
|
|
) |
|
|
dsl.Index(CreateIndex()) |
|
|
) |
|
|
.Query(log => log.Bool(b => b.Must(querys.ToArray()))) |
|
|
) |
|
|
.Source(SourceFilter) |
|
|
) |
|
|
.Sort(log => log.Field(GetField(nameof(EntityChange.ChangeTime)), sortOrder)) |
|
|
) |
|
|
.From(0) |
|
|
) |
|
|
.Size(100), |
|
|
) |
|
|
cancellationToken); |
|
|
.Source(selector => selector.Includes(field => |
|
|
|
|
|
field.Field(f => f.UserName))) |
|
|
var auditLogs = response.Documents.ToList(); |
|
|
.Sort(entity => entity.Field(f => f.ExecutionTime, SortOrder.Descending)), |
|
|
if (auditLogs.Any()) |
|
|
ct: cancellationToken); |
|
|
|
|
|
|
|
|
|
|
|
if (response.Hits.Count > 0) |
|
|
|
|
|
{ |
|
|
{ |
|
|
return response.Hits. |
|
|
var groupAuditLogs = auditLogs.GroupBy(log => log.UserName); |
|
|
Select(hit => new EntityChangeWithUsername |
|
|
foreach (var group in groupAuditLogs) |
|
|
|
|
|
{ |
|
|
|
|
|
var entityChangesList = group.Select(log => log.EntityChanges); |
|
|
|
|
|
|
|
|
|
|
|
foreach (var entityChanges in entityChangesList) |
|
|
{ |
|
|
{ |
|
|
UserName = hit.Source.UserName, |
|
|
foreach (var entityChange in entityChanges.Where(e => e.EntityId.Equals(entityId) && e.EntityTypeFullName.Equals(entityTypeFullName))) |
|
|
EntityChange = hit.InnerHits.Any() ? |
|
|
{ |
|
|
hit.InnerHits.First().Value.Documents<EntityChange>().FirstOrDefault() |
|
|
result.Add( |
|
|
: null |
|
|
new EntityChangeWithUsername |
|
|
}) |
|
|
{ |
|
|
.ToList(); |
|
|
UserName = group.Key, |
|
|
|
|
|
EntityChange = entityChange |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return new List<EntityChangeWithUsername>(); |
|
|
return result; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
protected virtual List<Func<QueryContainerDescriptor<EntityChange>, QueryContainer>> BuildQueryDescriptor( |
|
|
protected virtual List<Func<QueryContainerDescriptor<AuditLog>, QueryContainer>> BuildQueryDescriptor( |
|
|
Guid? auditLogId = null, |
|
|
Guid? auditLogId = null, |
|
|
DateTime? startTime = null, |
|
|
DateTime? startTime = null, |
|
|
DateTime? endTime = null, |
|
|
DateTime? endTime = null, |
|
|
EntityChangeType? changeType = null, |
|
|
EntityChangeType? changeType = null, |
|
|
string entityId = null, |
|
|
string entityId = null, |
|
|
string entityTypeFullName = null) |
|
|
string entityTypeFullName = null, |
|
|
|
|
|
Guid? entityChangeId = null) |
|
|
{ |
|
|
{ |
|
|
var querys = new List<Func<QueryContainerDescriptor<EntityChange>, QueryContainer>>(); |
|
|
var querys = new List<Func<QueryContainerDescriptor<AuditLog>, QueryContainer>>(); |
|
|
|
|
|
|
|
|
if (auditLogId.HasValue) |
|
|
if (auditLogId.HasValue) |
|
|
{ |
|
|
{ |
|
|
@ -322,6 +341,10 @@ public class ElasticsearchEntityChangeStore : IEntityChangeStore, ITransientDepe |
|
|
{ |
|
|
{ |
|
|
querys.Add(entity => entity.Wildcard(q => q.Field(GetField(nameof(EntityChange.EntityTypeFullName))).Value($"*{entityTypeFullName}*"))); |
|
|
querys.Add(entity => entity.Wildcard(q => q.Field(GetField(nameof(EntityChange.EntityTypeFullName))).Value($"*{entityTypeFullName}*"))); |
|
|
} |
|
|
} |
|
|
|
|
|
if (entityChangeId.HasValue) |
|
|
|
|
|
{ |
|
|
|
|
|
querys.Add(entity => entity.Term(q => q.Field(GetField(nameof(EntityChange.Id))).Value(entityChangeId))); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return querys; |
|
|
return querys; |
|
|
} |
|
|
} |
|
|
|