From 86a26d962ad1be908cba846814e35a53e57dbc17 Mon Sep 17 00:00:00 2001 From: cKey <35512826+colinin@users.noreply.github.com> Date: Sat, 15 Jan 2022 17:28:12 +0800 Subject: [PATCH] =?UTF-8?q?fix(idgenerator):=20=E8=A7=A3=E5=86=B3=E9=9B=AA?= =?UTF-8?q?=E8=8A=B1=E7=AE=97=E6=B3=95=E6=97=B6=E9=92=9F=E5=9B=9E=E9=80=80?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EventBus/CAP/AbpCAPSubscribeInvoker.cs | 41 +++++++++++++++---- .../Snowflake/SnowflakeIdGenerator.cs | 25 ++++++++--- .../Snowflake/SnowflakeIdOptions.cs | 5 +++ .../BackgroundTasks/DefaultJobLockProvider.cs | 33 +++++++++++---- .../JobRunnableContextExtensions.cs | 7 +++- .../BackgroundTasks/Jobs/HttpRequestJob.cs | 10 ++--- .../Internal/InMemoryJobStore.cs | 16 ++++++-- 7 files changed, 105 insertions(+), 32 deletions(-) diff --git a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs index 327cbce46..3b1fdadf2 100644 --- a/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs +++ b/aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs @@ -57,11 +57,12 @@ namespace LINGYUN.Abp.EventBus.CAP cancellationToken.ThrowIfCancellationRequested(); var methodInfo = context.ConsumerDescriptor.MethodInfo; - var reflectedType = methodInfo.ReflectedType.Name; + var reflectedTypeHandle = methodInfo.ReflectedType!.TypeHandle.Value; + var methodHandle = methodInfo.MethodHandle.Value; + var key = $"{reflectedTypeHandle}_{methodHandle}"; _logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name); - var key = $"{methodInfo.Module.Name}_{reflectedType}_{methodInfo.MetadataToken}"; var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); using var scope = _serviceProvider.CreateScope(); @@ -77,9 +78,10 @@ namespace LINGYUN.Abp.EventBus.CAP var tenantId = message.GetTenantIdOrNull(); for (var i = 0; i < parameterDescriptors.Count; i++) { - if (parameterDescriptors[i].IsFromCap) + var parameterDescriptor = parameterDescriptors[i]; + if (parameterDescriptor.IsFromCap) { - executeParameters[i] = new CapHeader(message.Headers); + executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken); } else { @@ -87,7 +89,7 @@ namespace LINGYUN.Abp.EventBus.CAP { if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json { - var eventData = _serializer.Deserialize(message.Value, parameterDescriptors[i].ParameterType); + var eventData = _serializer.Deserialize(message.Value, parameterDescriptor.ParameterType); // 租户数据也可能存在事件数据中 if (tenantId == null && eventData is IMultiTenant tenant) { @@ -97,7 +99,7 @@ namespace LINGYUN.Abp.EventBus.CAP } else { - var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType); + var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); if (converter.CanConvertFrom(message.Value.GetType())) { var eventData = converter.ConvertFrom(message.Value); @@ -110,7 +112,7 @@ namespace LINGYUN.Abp.EventBus.CAP } else { - if (parameterDescriptors[i].ParameterType.IsInstanceOfType(message.Value)) + if (parameterDescriptor.ParameterType.IsInstanceOfType(message.Value)) { // 租户数据也可能存在事件数据中 if (tenantId == null && message.Value is IMultiTenant tenant) @@ -121,7 +123,7 @@ namespace LINGYUN.Abp.EventBus.CAP } else { - var eventData = Convert.ChangeType(message.Value, parameterDescriptors[i].ParameterType); + var eventData = Convert.ChangeType(message.Value, parameterDescriptor.ParameterType); // 租户数据也可能存在事件数据中 if (tenantId == null && eventData is IMultiTenant tenant) { @@ -184,6 +186,29 @@ namespace LINGYUN.Abp.EventBus.CAP return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); } } + /// + /// + /// + /// + /// + /// + /// + /// + private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message, + CancellationToken cancellationToken) + { + if (typeof(CancellationToken).IsAssignableFrom(parameterDescriptor.ParameterType)) + { + return cancellationToken; + } + + if (parameterDescriptor.ParameterType.IsAssignableFrom(typeof(CapHeader))) + { + return new CapHeader(message.Headers); + } + + throw new ArgumentException(parameterDescriptor.Name); + } /// /// 获取事件处理类实例 /// diff --git a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs index 467063899..e7c48b106 100644 --- a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs +++ b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs @@ -20,8 +20,11 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake protected int TimestampLeftShift { get; } protected long SequenceMask { get; } + protected SnowflakeIdOptions Options { get; } + private SnowflakeIdGenerator(SnowflakeIdOptions options) { + Options = options; WorkerIdShift = options.SequenceBits; DatacenterIdShift = options.SequenceBits + options.WorkerIdBits; TimestampLeftShift = options.SequenceBits + options.WorkerIdBits + options.DatacenterIdBits; @@ -82,15 +85,24 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake { var timestamp = TimeGen(); - // TODO: 时间回退解决方案, 保存一个时间节点, 当服务器时间发生改变, 从保存的节点开始递增 - if (timestamp < _lastTimestamp) - throw new Exception( - $"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds"); + if (timestamp < _lastTimestamp) + { + // 如果启用此选项, 发生时间回退时使用上一个时间戳 + if (!Options.UsePreviousInTimeRollback) + { + throw new Exception( + $"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds"); + } + timestamp = _lastTimestamp; + } if (_lastTimestamp == timestamp) { Sequence = (Sequence + 1) & SequenceMask; - if (Sequence == 0) timestamp = TilNextMillis(_lastTimestamp); + if (Sequence == 0L) + { + timestamp = TilNextMillis(_lastTimestamp); + } } else { @@ -100,7 +112,8 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake _lastTimestamp = timestamp; var id = ((timestamp - Twepoch) << TimestampLeftShift) | (DatacenterId << DatacenterIdShift) | - (WorkerId << WorkerIdShift) | Sequence; + (WorkerId << WorkerIdShift) | + Sequence; return id; } diff --git a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs index 289b6ade4..3de6d3818 100644 --- a/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs +++ b/aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs @@ -24,6 +24,10 @@ public class SnowflakeIdOptions public long Sequence { get; set; } public int SequenceBits { get; set; } + /// + /// 发生时间回退时使用上一个ID + /// + public bool UsePreviousInTimeRollback { get; set; } public SnowflakeIdOptions() { @@ -31,5 +35,6 @@ public class SnowflakeIdOptions DatacenterIdBits = 5; Sequence = 0L; SequenceBits = 12; + UsePreviousInTimeRollback = true; } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs index 0eaa28718..622c0e1f8 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs @@ -1,4 +1,5 @@ -using System.Collections.Concurrent; +using System; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Volo.Abp; @@ -9,31 +10,47 @@ namespace LINGYUN.Abp.BackgroundTasks; [Dependency(TryRegister = true)] public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency { - private readonly ConcurrentDictionary _localSyncObjects = new(); + private readonly ConcurrentDictionary _localSyncObjects = new(); public virtual Task TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) { Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); - if (_localSyncObjects.ContainsKey(jobKey)) + if (_localSyncObjects.TryGetValue(jobKey, out var jobLock)) { - return Task.FromResult(false); + if (jobLock.ExpirationTime > DateTime.UtcNow) + { + return Task.FromResult(false); + } + jobLock.ExpirationTime = DateTime.UtcNow.AddSeconds(lockSeconds); + return Task.FromResult(true); } - var semaphore = new SemaphoreSlim(1, 1); - return Task.FromResult(_localSyncObjects.TryAdd(jobKey, semaphore)); + jobLock = new JobLock + { + ExpirationTime = DateTime.UtcNow.AddSeconds(lockSeconds), + Semaphore = new SemaphoreSlim(1, 1) + }; + + return Task.FromResult(_localSyncObjects.TryAdd(jobKey, jobLock)); } public Task TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) { Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); - if (_localSyncObjects.TryGetValue(jobKey, out var semaphore)) + if (_localSyncObjects.TryGetValue(jobKey, out var jobLock)) { - semaphore.Dispose(); + jobLock.Semaphore.Dispose(); return Task.FromResult(_localSyncObjects.TryRemove(jobKey, out _)); } return Task.FromResult(false); } + + private class JobLock + { + public DateTime ExpirationTime { get; set; } + public SemaphoreSlim Semaphore { get; set; } + } } diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs index 1923355db..a9ac515da 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs @@ -42,9 +42,14 @@ public static class JobRunnableContextExtensions } public static bool TryGetMultiTenantId(this JobRunnableContext context, out Guid? tenantId) + { + return context.TryGetMultiTenantId(nameof(JobInfo.TenantId), out tenantId); + } + + public static bool TryGetMultiTenantId(this JobRunnableContext context, string multiTenancyKey, out Guid? tenantId) { tenantId = null; - if (context.TryGetString(nameof(JobInfo.TenantId), out var tenantUUIdString) && + if (context.TryGetString(multiTenancyKey, out var tenantUUIdString) && Guid.TryParse(tenantUUIdString, out var tenantUUId)) { tenantId = tenantUUId; diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs index 54a0d5784..b2bc4ed8c 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs @@ -43,21 +43,21 @@ public class HttpRequestJob : IJobRunnable { var url = context.GetString(PropertyUrl); var method = context.GetString(PropertyMethod); - context.TryGetJobData(PropertyData, out var data); - context.TryGetJobData(PropertyContentType, out var contentType); + context.TryGetString(PropertyData, out var data); + context.TryGetString(PropertyContentType, out var contentType); var jsonSerializer = context.GetRequiredService(); var httpRequestMesasge = new HttpRequestMessage(new HttpMethod(method), url); - if (data != null) + if (!data.IsNullOrWhiteSpace()) { // TODO: 需要支持表单类型 // application/json 支持 httpRequestMesasge.Content = new StringContent( - jsonSerializer.Serialize(data), + data, Encoding.UTF8, - contentType?.ToString() ?? MimeTypes.Application.Json); + contentType ?? MimeTypes.Application.Json); } if (context.TryGetJobData(PropertyHeaders, out var headers)) { diff --git a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs index 5d42369e8..604d6ad4d 100644 --- a/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs +++ b/aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs @@ -19,8 +19,12 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency public Task> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default) { + var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; + var jobs = _memoryJobStore - .Where(x => x.JobType == JobType.Period && x.Status == JobStatus.Running) + .Where(x => !x.IsAbandoned) + .Where(x => x.JobType == JobType.Period && status.Contains(x.Status)) + .Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount)) .OrderByDescending(x => x.Priority) .ToList(); @@ -30,8 +34,12 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency public Task> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default) { var now = DateTime.Now; + var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry }; + var jobs = _memoryJobStore - .Where(x => !x.IsAbandoned && x.JobType != JobType.Period && x.Status == JobStatus.Running) + .Where(x => !x.IsAbandoned) + .Where(x => x.JobType != JobType.Period && status.Contains(x.Status)) + .Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount)) .OrderByDescending(x => x.Priority) .ThenBy(x => x.TryCount) .ThenBy(x => x.NextRunTime) @@ -75,8 +83,8 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency { var expiratime = DateTime.Now - jobExpiratime; - var expriaJobs = _memoryJobStore.Where( - x => x.Status == JobStatus.Completed && + var expriaJobs = _memoryJobStore + .Where(x => x.Status == JobStatus.Completed && expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0) .Take(maxResultCount);