diff --git a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
index 327cbce46..3b1fdadf2 100644
--- a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
+++ b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
@@ -57,11 +57,12 @@ namespace LINGYUN.Abp.EventBus.CAP
cancellationToken.ThrowIfCancellationRequested();
var methodInfo = context.ConsumerDescriptor.MethodInfo;
- var reflectedType = methodInfo.ReflectedType.Name;
+ var reflectedTypeHandle = methodInfo.ReflectedType!.TypeHandle.Value;
+ var methodHandle = methodInfo.MethodHandle.Value;
+ var key = $"{reflectedTypeHandle}_{methodHandle}";
_logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);
- var key = $"{methodInfo.Module.Name}_{reflectedType}_{methodInfo.MetadataToken}";
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
using var scope = _serviceProvider.CreateScope();
@@ -77,9 +78,10 @@ namespace LINGYUN.Abp.EventBus.CAP
var tenantId = message.GetTenantIdOrNull();
for (var i = 0; i < parameterDescriptors.Count; i++)
{
- if (parameterDescriptors[i].IsFromCap)
+ var parameterDescriptor = parameterDescriptors[i];
+ if (parameterDescriptor.IsFromCap)
{
- executeParameters[i] = new CapHeader(message.Headers);
+ executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken);
}
else
{
@@ -87,7 +89,7 @@ namespace LINGYUN.Abp.EventBus.CAP
{
if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json
{
- var eventData = _serializer.Deserialize(message.Value, parameterDescriptors[i].ParameterType);
+ var eventData = _serializer.Deserialize(message.Value, parameterDescriptor.ParameterType);
// 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant)
{
@@ -97,7 +99,7 @@ namespace LINGYUN.Abp.EventBus.CAP
}
else
{
- var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType);
+ var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(message.Value.GetType()))
{
var eventData = converter.ConvertFrom(message.Value);
@@ -110,7 +112,7 @@ namespace LINGYUN.Abp.EventBus.CAP
}
else
{
- if (parameterDescriptors[i].ParameterType.IsInstanceOfType(message.Value))
+ if (parameterDescriptor.ParameterType.IsInstanceOfType(message.Value))
{
// 租户数据也可能存在事件数据中
if (tenantId == null && message.Value is IMultiTenant tenant)
@@ -121,7 +123,7 @@ namespace LINGYUN.Abp.EventBus.CAP
}
else
{
- var eventData = Convert.ChangeType(message.Value, parameterDescriptors[i].ParameterType);
+ var eventData = Convert.ChangeType(message.Value, parameterDescriptor.ParameterType);
// 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant)
{
@@ -184,6 +186,29 @@ namespace LINGYUN.Abp.EventBus.CAP
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}
}
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,
+ CancellationToken cancellationToken)
+ {
+ if (typeof(CancellationToken).IsAssignableFrom(parameterDescriptor.ParameterType))
+ {
+ return cancellationToken;
+ }
+
+ if (parameterDescriptor.ParameterType.IsAssignableFrom(typeof(CapHeader)))
+ {
+ return new CapHeader(message.Headers);
+ }
+
+ throw new ArgumentException(parameterDescriptor.Name);
+ }
///
/// 获取事件处理类实例
///
diff --git a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs
index 467063899..e7c48b106 100644
--- a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs
+++ b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs
@@ -20,8 +20,11 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake
protected int TimestampLeftShift { get; }
protected long SequenceMask { get; }
+ protected SnowflakeIdOptions Options { get; }
+
private SnowflakeIdGenerator(SnowflakeIdOptions options)
{
+ Options = options;
WorkerIdShift = options.SequenceBits;
DatacenterIdShift = options.SequenceBits + options.WorkerIdBits;
TimestampLeftShift = options.SequenceBits + options.WorkerIdBits + options.DatacenterIdBits;
@@ -82,15 +85,24 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake
{
var timestamp = TimeGen();
- // TODO: 时间回退解决方案, 保存一个时间节点, 当服务器时间发生改变, 从保存的节点开始递增
- if (timestamp < _lastTimestamp)
- throw new Exception(
- $"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds");
+ if (timestamp < _lastTimestamp)
+ {
+ // 如果启用此选项, 发生时间回退时使用上一个时间戳
+ if (!Options.UsePreviousInTimeRollback)
+ {
+ throw new Exception(
+ $"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds");
+ }
+ timestamp = _lastTimestamp;
+ }
if (_lastTimestamp == timestamp)
{
Sequence = (Sequence + 1) & SequenceMask;
- if (Sequence == 0) timestamp = TilNextMillis(_lastTimestamp);
+ if (Sequence == 0L)
+ {
+ timestamp = TilNextMillis(_lastTimestamp);
+ }
}
else
{
@@ -100,7 +112,8 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake
_lastTimestamp = timestamp;
var id = ((timestamp - Twepoch) << TimestampLeftShift) |
(DatacenterId << DatacenterIdShift) |
- (WorkerId << WorkerIdShift) | Sequence;
+ (WorkerId << WorkerIdShift) |
+ Sequence;
return id;
}
diff --git a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs
index 289b6ade4..3de6d3818 100644
--- a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs
+++ b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs
@@ -24,6 +24,10 @@ public class SnowflakeIdOptions
public long Sequence { get; set; }
public int SequenceBits { get; set; }
+ ///
+ /// 发生时间回退时使用上一个ID
+ ///
+ public bool UsePreviousInTimeRollback { get; set; }
public SnowflakeIdOptions()
{
@@ -31,5 +35,6 @@ public class SnowflakeIdOptions
DatacenterIdBits = 5;
Sequence = 0L;
SequenceBits = 12;
+ UsePreviousInTimeRollback = true;
}
}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
index 0eaa28718..622c0e1f8 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
@@ -1,4 +1,5 @@
-using System.Collections.Concurrent;
+using System;
+using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp;
@@ -9,31 +10,47 @@ namespace LINGYUN.Abp.BackgroundTasks;
[Dependency(TryRegister = true)]
public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency
{
- private readonly ConcurrentDictionary _localSyncObjects = new();
+ private readonly ConcurrentDictionary _localSyncObjects = new();
public virtual Task TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
- if (_localSyncObjects.ContainsKey(jobKey))
+ if (_localSyncObjects.TryGetValue(jobKey, out var jobLock))
{
- return Task.FromResult(false);
+ if (jobLock.ExpirationTime > DateTime.UtcNow)
+ {
+ return Task.FromResult(false);
+ }
+ jobLock.ExpirationTime = DateTime.UtcNow.AddSeconds(lockSeconds);
+ return Task.FromResult(true);
}
- var semaphore = new SemaphoreSlim(1, 1);
- return Task.FromResult(_localSyncObjects.TryAdd(jobKey, semaphore));
+ jobLock = new JobLock
+ {
+ ExpirationTime = DateTime.UtcNow.AddSeconds(lockSeconds),
+ Semaphore = new SemaphoreSlim(1, 1)
+ };
+
+ return Task.FromResult(_localSyncObjects.TryAdd(jobKey, jobLock));
}
public Task TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default)
{
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
- if (_localSyncObjects.TryGetValue(jobKey, out var semaphore))
+ if (_localSyncObjects.TryGetValue(jobKey, out var jobLock))
{
- semaphore.Dispose();
+ jobLock.Semaphore.Dispose();
return Task.FromResult(_localSyncObjects.TryRemove(jobKey, out _));
}
return Task.FromResult(false);
}
+
+ private class JobLock
+ {
+ public DateTime ExpirationTime { get; set; }
+ public SemaphoreSlim Semaphore { get; set; }
+ }
}
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs
index 1923355db..a9ac515da 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs
@@ -42,9 +42,14 @@ public static class JobRunnableContextExtensions
}
public static bool TryGetMultiTenantId(this JobRunnableContext context, out Guid? tenantId)
+ {
+ return context.TryGetMultiTenantId(nameof(JobInfo.TenantId), out tenantId);
+ }
+
+ public static bool TryGetMultiTenantId(this JobRunnableContext context, string multiTenancyKey, out Guid? tenantId)
{
tenantId = null;
- if (context.TryGetString(nameof(JobInfo.TenantId), out var tenantUUIdString) &&
+ if (context.TryGetString(multiTenancyKey, out var tenantUUIdString) &&
Guid.TryParse(tenantUUIdString, out var tenantUUId))
{
tenantId = tenantUUId;
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs
index 54a0d5784..b2bc4ed8c 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs
@@ -43,21 +43,21 @@ public class HttpRequestJob : IJobRunnable
{
var url = context.GetString(PropertyUrl);
var method = context.GetString(PropertyMethod);
- context.TryGetJobData(PropertyData, out var data);
- context.TryGetJobData(PropertyContentType, out var contentType);
+ context.TryGetString(PropertyData, out var data);
+ context.TryGetString(PropertyContentType, out var contentType);
var jsonSerializer = context.GetRequiredService();
var httpRequestMesasge = new HttpRequestMessage(new HttpMethod(method), url);
- if (data != null)
+ if (!data.IsNullOrWhiteSpace())
{
// TODO: 需要支持表单类型
// application/json 支持
httpRequestMesasge.Content = new StringContent(
- jsonSerializer.Serialize(data),
+ data,
Encoding.UTF8,
- contentType?.ToString() ?? MimeTypes.Application.Json);
+ contentType ?? MimeTypes.Application.Json);
}
if (context.TryGetJobData(PropertyHeaders, out var headers))
{
diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
index 5d42369e8..604d6ad4d 100644
--- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
+++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs
@@ -19,8 +19,12 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
public Task> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
{
+ var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
+
var jobs = _memoryJobStore
- .Where(x => x.JobType == JobType.Period && x.Status == JobStatus.Running)
+ .Where(x => !x.IsAbandoned)
+ .Where(x => x.JobType == JobType.Period && status.Contains(x.Status))
+ .Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
.OrderByDescending(x => x.Priority)
.ToList();
@@ -30,8 +34,12 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
public Task> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{
var now = DateTime.Now;
+ var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
+
var jobs = _memoryJobStore
- .Where(x => !x.IsAbandoned && x.JobType != JobType.Period && x.Status == JobStatus.Running)
+ .Where(x => !x.IsAbandoned)
+ .Where(x => x.JobType != JobType.Period && status.Contains(x.Status))
+ .Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
.OrderByDescending(x => x.Priority)
.ThenBy(x => x.TryCount)
.ThenBy(x => x.NextRunTime)
@@ -75,8 +83,8 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
{
var expiratime = DateTime.Now - jobExpiratime;
- var expriaJobs = _memoryJobStore.Where(
- x => x.Status == JobStatus.Completed &&
+ var expriaJobs = _memoryJobStore
+ .Where(x => x.Status == JobStatus.Completed &&
expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
.Take(maxResultCount);