|
|
|
@ -2,7 +2,6 @@ |
|
|
|
using Microsoft.Extensions.Logging.Abstractions; |
|
|
|
using System; |
|
|
|
using System.Collections.Generic; |
|
|
|
using System.Data; |
|
|
|
using System.Linq; |
|
|
|
using System.Threading; |
|
|
|
using System.Threading.Tasks; |
|
|
|
@ -21,7 +20,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
|
|
|
|
private readonly ICurrentTenant _currentTenant; |
|
|
|
private readonly IGuidGenerator _guidGenerator; |
|
|
|
//private readonly IUnitOfWorkManager _unitOfWorkManager;
|
|
|
|
private readonly IWorkflowRepository _workflowRepository; |
|
|
|
private readonly IWorkflowEventRepository _workflowEventRepository; |
|
|
|
private readonly IWorkflowExecutionErrorRepository _executionErrorRepository; |
|
|
|
@ -35,7 +33,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
public AbpWorkflowPersistenceProvider( |
|
|
|
ICurrentTenant currentTenant, |
|
|
|
IGuidGenerator guidGenerator, |
|
|
|
//IUnitOfWorkManager unitOfWorkManager,
|
|
|
|
IAsyncQueryableExecuter asyncQueryableExecuter, |
|
|
|
IWorkflowRepository workflowRepository, |
|
|
|
IWorkflowEventRepository workflowEventRepository, |
|
|
|
@ -45,7 +42,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
{ |
|
|
|
_currentTenant = currentTenant; |
|
|
|
_guidGenerator = guidGenerator; |
|
|
|
//_unitOfWorkManager = unitOfWorkManager;
|
|
|
|
_asyncQueryableExecuter = asyncQueryableExecuter; |
|
|
|
|
|
|
|
_workflowRepository = workflowRepository; |
|
|
|
@ -62,8 +58,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
string token, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var uid = Guid.Parse(eventSubscriptionId); |
|
|
|
var existingEntity = await _subscriptionRepository.GetAsync(uid, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
@ -73,22 +67,16 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
existingEntity.SetSubscriptionToken(null, null, null); |
|
|
|
|
|
|
|
await _subscriptionRepository.UpdateAsync(existingEntity, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync();
|
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task<string> CreateEvent( |
|
|
|
Event newEvent, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var we = newEvent.ToPersistable(_guidGenerator, _currentTenant); |
|
|
|
|
|
|
|
await _workflowEventRepository.InsertAsync(we, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync();
|
|
|
|
|
|
|
|
newEvent.Id = we.Id.ToString(); |
|
|
|
|
|
|
|
return newEvent.Id; |
|
|
|
@ -98,14 +86,10 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
EventSubscription subscription, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var wes = subscription.ToPersistable(_guidGenerator, _currentTenant); |
|
|
|
|
|
|
|
await _subscriptionRepository.InsertAsync(wes, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync();
|
|
|
|
|
|
|
|
subscription.Id = wes.Id.ToString(); |
|
|
|
|
|
|
|
return subscription.Id; |
|
|
|
@ -115,14 +99,10 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
WorkflowInstance workflow, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var wf = workflow.ToPersistable(_guidGenerator, _currentTenant); |
|
|
|
|
|
|
|
await _workflowRepository.InsertAsync(wf, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync();
|
|
|
|
|
|
|
|
workflow.Id = wf.Id.ToString(); |
|
|
|
|
|
|
|
return workflow.Id; |
|
|
|
@ -133,7 +113,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
// TODO:
|
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
var eventId = Guid.Parse(id); |
|
|
|
@ -143,7 +122,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflowEvent.ToEvent(); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<IEnumerable<string>> GetEvents( |
|
|
|
string eventName, |
|
|
|
string eventKey, |
|
|
|
@ -160,7 +138,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflowEventIds.Select(e => e.ToString()); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<EventSubscription> GetFirstOpenSubscription( |
|
|
|
string eventName, |
|
|
|
string eventKey, |
|
|
|
@ -175,7 +152,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflowEventSubscription?.ToEventSubscription(); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<IEnumerable<string>> GetRunnableEvents( |
|
|
|
DateTime asAt, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
@ -193,7 +169,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflowEventIdList.Select(e => e.ToString()); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<IEnumerable<string>> GetRunnableInstances( |
|
|
|
DateTime asAt, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
@ -209,7 +184,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflowIdList.Select(e => e.ToString()); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<EventSubscription> GetSubscription( |
|
|
|
string eventSubscriptionId, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
@ -220,7 +194,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return subscription?.ToEventSubscription(); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<IEnumerable<EventSubscription>> GetSubscriptions( |
|
|
|
string eventName, |
|
|
|
string eventKey, |
|
|
|
@ -236,7 +209,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return eventSubscriptions.Select(x => x.ToEventSubscription()); |
|
|
|
} |
|
|
|
|
|
|
|
[UnitOfWork(true, IsolationLevel.ReadUncommitted)] |
|
|
|
public virtual async Task<WorkflowInstance> GetWorkflowInstance( |
|
|
|
string Id, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
@ -250,7 +222,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflow?.ToWorkflowInstance(); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances( |
|
|
|
WorkflowStatus? status, |
|
|
|
string type, |
|
|
|
@ -264,7 +235,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
return workflows.Select(x => x.ToWorkflowInstance()); |
|
|
|
} |
|
|
|
|
|
|
|
//[UnitOfWork]
|
|
|
|
public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances( |
|
|
|
IEnumerable<string> ids, |
|
|
|
CancellationToken cancellationToken = default) |
|
|
|
@ -281,50 +251,36 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
|
|
|
|
public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var eventId = Guid.Parse(id); |
|
|
|
var workflowEvent = await _workflowEventRepository.GetAsync(eventId, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
workflowEvent.IsProcessed = true; |
|
|
|
|
|
|
|
await _workflowEventRepository.UpdateAsync(workflowEvent, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var eventId = Guid.Parse(id); |
|
|
|
var workflowEvent = await _workflowEventRepository.GetAsync(eventId, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
workflowEvent.IsProcessed = false; |
|
|
|
|
|
|
|
await _workflowEventRepository.UpdateAsync(workflowEvent, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
if (errors.Any()) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var workflowExecutionErrors = errors.Select(x => x.ToPersistable(_currentTenant)); |
|
|
|
|
|
|
|
await _executionErrorRepository.InsertManyAsync(workflowExecutionErrors, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
if (!Guid.TryParse(workflow.Id, out Guid workflowId)) |
|
|
|
{ |
|
|
|
workflowId = _guidGenerator.Create(); |
|
|
|
@ -343,8 +299,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
|
|
|
|
await _workflowRepository.UpdateAsync(wf, cancellationToken: cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
} |
|
|
|
|
|
|
|
public virtual async Task ProcessCommands( |
|
|
|
@ -354,8 +308,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var quertable = await _scheduledCommandRepository.GetQueryableAsync(); |
|
|
|
var commands = await _asyncQueryableExecuter.ToListAsync( |
|
|
|
quertable.Where(x => x.ExecuteTime < asOf.UtcDateTime.Ticks), |
|
|
|
@ -367,8 +319,6 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
} |
|
|
|
|
|
|
|
await _scheduledCommandRepository.DeleteManyAsync(commands, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
} |
|
|
|
catch(Exception ex) |
|
|
|
{ |
|
|
|
@ -381,13 +331,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
{ |
|
|
|
if (!await _scheduledCommandRepository.CheckExistsAsync(command.CommandName, command.Data)) |
|
|
|
{ |
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var workflowCommand = command.ToPersistable(_currentTenant); |
|
|
|
|
|
|
|
await _scheduledCommandRepository.InsertAsync(workflowCommand); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync();
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -400,16 +346,12 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
{ |
|
|
|
var uid = Guid.Parse(eventSubscriptionId); |
|
|
|
|
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var existingEntity = await _subscriptionRepository.GetAsync(uid, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
existingEntity.SetSubscriptionToken(token, workerId, expiry); |
|
|
|
|
|
|
|
await _subscriptionRepository.UpdateAsync(existingEntity, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
|
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
@ -419,13 +361,9 @@ namespace LINGYUN.Abp.WorkflowCore.Persistence |
|
|
|
{ |
|
|
|
var uid = Guid.Parse(eventSubscriptionId); |
|
|
|
|
|
|
|
//using var unitOfWork = _unitOfWorkManager.Begin();
|
|
|
|
|
|
|
|
var existingEntity = await _subscriptionRepository.GetAsync(uid, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
await _subscriptionRepository.DeleteAsync(existingEntity, cancellationToken: cancellationToken); |
|
|
|
|
|
|
|
//await unitOfWork.SaveChangesAsync(cancellationToken);
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|