Browse Source

Merge pull request #471 from colinin/5.1.1

fix(idgenerator): 解决雪花算法时钟回退问题
pull/474/head
yx lin 4 years ago
committed by GitHub
parent
commit
2e3521d61d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
  2. 25
      aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs
  3. 5
      aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs
  4. 33
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs
  5. 7
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs
  6. 10
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs
  7. 16
      aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs

41
aspnet-core/modules/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs

@ -57,11 +57,12 @@ namespace LINGYUN.Abp.EventBus.CAP
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
var methodInfo = context.ConsumerDescriptor.MethodInfo; var methodInfo = context.ConsumerDescriptor.MethodInfo;
var reflectedType = methodInfo.ReflectedType.Name; var reflectedTypeHandle = methodInfo.ReflectedType!.TypeHandle.Value;
var methodHandle = methodInfo.MethodHandle.Value;
var key = $"{reflectedTypeHandle}_{methodHandle}";
_logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name); _logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);
var key = $"{methodInfo.Module.Name}_{reflectedType}_{methodInfo.MetadataToken}";
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
using var scope = _serviceProvider.CreateScope(); using var scope = _serviceProvider.CreateScope();
@ -77,9 +78,10 @@ namespace LINGYUN.Abp.EventBus.CAP
var tenantId = message.GetTenantIdOrNull(); var tenantId = message.GetTenantIdOrNull();
for (var i = 0; i < parameterDescriptors.Count; i++) for (var i = 0; i < parameterDescriptors.Count; i++)
{ {
if (parameterDescriptors[i].IsFromCap) var parameterDescriptor = parameterDescriptors[i];
if (parameterDescriptor.IsFromCap)
{ {
executeParameters[i] = new CapHeader(message.Headers); executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken);
} }
else else
{ {
@ -87,7 +89,7 @@ namespace LINGYUN.Abp.EventBus.CAP
{ {
if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json
{ {
var eventData = _serializer.Deserialize(message.Value, parameterDescriptors[i].ParameterType); var eventData = _serializer.Deserialize(message.Value, parameterDescriptor.ParameterType);
// 租户数据也可能存在事件数据中 // 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant) if (tenantId == null && eventData is IMultiTenant tenant)
{ {
@ -97,7 +99,7 @@ namespace LINGYUN.Abp.EventBus.CAP
} }
else else
{ {
var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType); var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(message.Value.GetType())) if (converter.CanConvertFrom(message.Value.GetType()))
{ {
var eventData = converter.ConvertFrom(message.Value); var eventData = converter.ConvertFrom(message.Value);
@ -110,7 +112,7 @@ namespace LINGYUN.Abp.EventBus.CAP
} }
else else
{ {
if (parameterDescriptors[i].ParameterType.IsInstanceOfType(message.Value)) if (parameterDescriptor.ParameterType.IsInstanceOfType(message.Value))
{ {
// 租户数据也可能存在事件数据中 // 租户数据也可能存在事件数据中
if (tenantId == null && message.Value is IMultiTenant tenant) if (tenantId == null && message.Value is IMultiTenant tenant)
@ -121,7 +123,7 @@ namespace LINGYUN.Abp.EventBus.CAP
} }
else else
{ {
var eventData = Convert.ChangeType(message.Value, parameterDescriptors[i].ParameterType); var eventData = Convert.ChangeType(message.Value, parameterDescriptor.ParameterType);
// 租户数据也可能存在事件数据中 // 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant) if (tenantId == null && eventData is IMultiTenant tenant)
{ {
@ -184,6 +186,29 @@ namespace LINGYUN.Abp.EventBus.CAP
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
} }
} }
/// <summary>
///
/// </summary>
/// <param name="parameterDescriptor"></param>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,
CancellationToken cancellationToken)
{
if (typeof(CancellationToken).IsAssignableFrom(parameterDescriptor.ParameterType))
{
return cancellationToken;
}
if (parameterDescriptor.ParameterType.IsAssignableFrom(typeof(CapHeader)))
{
return new CapHeader(message.Headers);
}
throw new ArgumentException(parameterDescriptor.Name);
}
/// <summary> /// <summary>
/// 获取事件处理类实例 /// 获取事件处理类实例
/// </summary> /// </summary>

25
aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdGenerator.cs

@ -20,8 +20,11 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake
protected int TimestampLeftShift { get; } protected int TimestampLeftShift { get; }
protected long SequenceMask { get; } protected long SequenceMask { get; }
protected SnowflakeIdOptions Options { get; }
private SnowflakeIdGenerator(SnowflakeIdOptions options) private SnowflakeIdGenerator(SnowflakeIdOptions options)
{ {
Options = options;
WorkerIdShift = options.SequenceBits; WorkerIdShift = options.SequenceBits;
DatacenterIdShift = options.SequenceBits + options.WorkerIdBits; DatacenterIdShift = options.SequenceBits + options.WorkerIdBits;
TimestampLeftShift = options.SequenceBits + options.WorkerIdBits + options.DatacenterIdBits; TimestampLeftShift = options.SequenceBits + options.WorkerIdBits + options.DatacenterIdBits;
@ -82,15 +85,24 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake
{ {
var timestamp = TimeGen(); var timestamp = TimeGen();
// TODO: 时间回退解决方案, 保存一个时间节点, 当服务器时间发生改变, 从保存的节点开始递增 if (timestamp < _lastTimestamp)
if (timestamp < _lastTimestamp) {
throw new Exception( // 如果启用此选项, 发生时间回退时使用上一个时间戳
$"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds"); if (!Options.UsePreviousInTimeRollback)
{
throw new Exception(
$"InvalidSystemClock: Clock moved backwards, Refusing to generate id for {_lastTimestamp - timestamp} milliseconds");
}
timestamp = _lastTimestamp;
}
if (_lastTimestamp == timestamp) if (_lastTimestamp == timestamp)
{ {
Sequence = (Sequence + 1) & SequenceMask; Sequence = (Sequence + 1) & SequenceMask;
if (Sequence == 0) timestamp = TilNextMillis(_lastTimestamp); if (Sequence == 0L)
{
timestamp = TilNextMillis(_lastTimestamp);
}
} }
else else
{ {
@ -100,7 +112,8 @@ namespace LINGYUN.Abp.IdGenerator.Snowflake
_lastTimestamp = timestamp; _lastTimestamp = timestamp;
var id = ((timestamp - Twepoch) << TimestampLeftShift) | var id = ((timestamp - Twepoch) << TimestampLeftShift) |
(DatacenterId << DatacenterIdShift) | (DatacenterId << DatacenterIdShift) |
(WorkerId << WorkerIdShift) | Sequence; (WorkerId << WorkerIdShift) |
Sequence;
return id; return id;
} }

5
aspnet-core/modules/common/LINGYUN.Abp.IdGenerator/LINGYUN/Abp/IdGenerator/Snowflake/SnowflakeIdOptions.cs

@ -24,6 +24,10 @@ public class SnowflakeIdOptions
public long Sequence { get; set; } public long Sequence { get; set; }
public int SequenceBits { get; set; } public int SequenceBits { get; set; }
/// <summary>
/// 发生时间回退时使用上一个ID
/// </summary>
public bool UsePreviousInTimeRollback { get; set; }
public SnowflakeIdOptions() public SnowflakeIdOptions()
{ {
@ -31,5 +35,6 @@ public class SnowflakeIdOptions
DatacenterIdBits = 5; DatacenterIdBits = 5;
Sequence = 0L; Sequence = 0L;
SequenceBits = 12; SequenceBits = 12;
UsePreviousInTimeRollback = true;
} }
} }

33
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/DefaultJobLockProvider.cs

@ -1,4 +1,5 @@
using System.Collections.Concurrent; using System;
using System.Collections.Concurrent;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp; using Volo.Abp;
@ -9,31 +10,47 @@ namespace LINGYUN.Abp.BackgroundTasks;
[Dependency(TryRegister = true)] [Dependency(TryRegister = true)]
public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency public class DefaultJobLockProvider : IJobLockProvider, ISingletonDependency
{ {
private readonly ConcurrentDictionary<string, SemaphoreSlim> _localSyncObjects = new(); private readonly ConcurrentDictionary<string, JobLock> _localSyncObjects = new();
public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default) public virtual Task<bool> TryLockAsync(string jobKey, int lockSeconds, CancellationToken cancellationToken = default)
{ {
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
if (_localSyncObjects.ContainsKey(jobKey)) if (_localSyncObjects.TryGetValue(jobKey, out var jobLock))
{ {
return Task.FromResult(false); if (jobLock.ExpirationTime > DateTime.UtcNow)
{
return Task.FromResult(false);
}
jobLock.ExpirationTime = DateTime.UtcNow.AddSeconds(lockSeconds);
return Task.FromResult(true);
} }
var semaphore = new SemaphoreSlim(1, 1); jobLock = new JobLock
return Task.FromResult(_localSyncObjects.TryAdd(jobKey, semaphore)); {
ExpirationTime = DateTime.UtcNow.AddSeconds(lockSeconds),
Semaphore = new SemaphoreSlim(1, 1)
};
return Task.FromResult(_localSyncObjects.TryAdd(jobKey, jobLock));
} }
public Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default) public Task<bool> TryReleaseAsync(string jobKey, CancellationToken cancellationToken = default)
{ {
Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey)); Check.NotNullOrWhiteSpace(jobKey, nameof(jobKey));
if (_localSyncObjects.TryGetValue(jobKey, out var semaphore)) if (_localSyncObjects.TryGetValue(jobKey, out var jobLock))
{ {
semaphore.Dispose(); jobLock.Semaphore.Dispose();
return Task.FromResult(_localSyncObjects.TryRemove(jobKey, out _)); return Task.FromResult(_localSyncObjects.TryRemove(jobKey, out _));
} }
return Task.FromResult(false); return Task.FromResult(false);
} }
private class JobLock
{
public DateTime ExpirationTime { get; set; }
public SemaphoreSlim Semaphore { get; set; }
}
} }

7
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Abstractions/LINGYUN/Abp/BackgroundTasks/JobRunnableContextExtensions.cs

@ -42,9 +42,14 @@ public static class JobRunnableContextExtensions
} }
public static bool TryGetMultiTenantId(this JobRunnableContext context, out Guid? tenantId) public static bool TryGetMultiTenantId(this JobRunnableContext context, out Guid? tenantId)
{
return context.TryGetMultiTenantId(nameof(JobInfo.TenantId), out tenantId);
}
public static bool TryGetMultiTenantId(this JobRunnableContext context, string multiTenancyKey, out Guid? tenantId)
{ {
tenantId = null; tenantId = null;
if (context.TryGetString(nameof(JobInfo.TenantId), out var tenantUUIdString) && if (context.TryGetString(multiTenancyKey, out var tenantUUIdString) &&
Guid.TryParse(tenantUUIdString, out var tenantUUId)) Guid.TryParse(tenantUUIdString, out var tenantUUId))
{ {
tenantId = tenantUUId; tenantId = tenantUUId;

10
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks.Jobs/LINGYUN/Abp/BackgroundTasks/Jobs/HttpRequestJob.cs

@ -43,21 +43,21 @@ public class HttpRequestJob : IJobRunnable
{ {
var url = context.GetString(PropertyUrl); var url = context.GetString(PropertyUrl);
var method = context.GetString(PropertyMethod); var method = context.GetString(PropertyMethod);
context.TryGetJobData(PropertyData, out var data); context.TryGetString(PropertyData, out var data);
context.TryGetJobData(PropertyContentType, out var contentType); context.TryGetString(PropertyContentType, out var contentType);
var jsonSerializer = context.GetRequiredService<IJsonSerializer>(); var jsonSerializer = context.GetRequiredService<IJsonSerializer>();
var httpRequestMesasge = new HttpRequestMessage(new HttpMethod(method), url); var httpRequestMesasge = new HttpRequestMessage(new HttpMethod(method), url);
if (data != null) if (!data.IsNullOrWhiteSpace())
{ {
// TODO: 需要支持表单类型 // TODO: 需要支持表单类型
// application/json 支持 // application/json 支持
httpRequestMesasge.Content = new StringContent( httpRequestMesasge.Content = new StringContent(
jsonSerializer.Serialize(data), data,
Encoding.UTF8, Encoding.UTF8,
contentType?.ToString() ?? MimeTypes.Application.Json); contentType ?? MimeTypes.Application.Json);
} }
if (context.TryGetJobData(PropertyHeaders, out var headers)) if (context.TryGetJobData(PropertyHeaders, out var headers))
{ {

16
aspnet-core/modules/task-management/LINGYUN.Abp.BackgroundTasks/LINGYUN/Abp/BackgroundTasks/Internal/InMemoryJobStore.cs

@ -19,8 +19,12 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
public Task<List<JobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default) public Task<List<JobInfo>> GetAllPeriodTasksAsync(CancellationToken cancellationToken = default)
{ {
var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
var jobs = _memoryJobStore var jobs = _memoryJobStore
.Where(x => x.JobType == JobType.Period && x.Status == JobStatus.Running) .Where(x => !x.IsAbandoned)
.Where(x => x.JobType == JobType.Period && status.Contains(x.Status))
.Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
.OrderByDescending(x => x.Priority) .OrderByDescending(x => x.Priority)
.ToList(); .ToList();
@ -30,8 +34,12 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
public Task<List<JobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default) public Task<List<JobInfo>> GetWaitingListAsync(int maxResultCount, CancellationToken cancellationToken = default)
{ {
var now = DateTime.Now; var now = DateTime.Now;
var status = new JobStatus[] { JobStatus.Running, JobStatus.FailedRetry };
var jobs = _memoryJobStore var jobs = _memoryJobStore
.Where(x => !x.IsAbandoned && x.JobType != JobType.Period && x.Status == JobStatus.Running) .Where(x => !x.IsAbandoned)
.Where(x => x.JobType != JobType.Period && status.Contains(x.Status))
.Where(x => (x.MaxCount == 0 || x.TriggerCount < x.MaxCount) || (x.MaxTryCount == 0 || x.TryCount < x.MaxTryCount))
.OrderByDescending(x => x.Priority) .OrderByDescending(x => x.Priority)
.ThenBy(x => x.TryCount) .ThenBy(x => x.TryCount)
.ThenBy(x => x.NextRunTime) .ThenBy(x => x.NextRunTime)
@ -75,8 +83,8 @@ internal class InMemoryJobStore : IJobStore, ISingletonDependency
{ {
var expiratime = DateTime.Now - jobExpiratime; var expiratime = DateTime.Now - jobExpiratime;
var expriaJobs = _memoryJobStore.Where( var expriaJobs = _memoryJobStore
x => x.Status == JobStatus.Completed && .Where(x => x.Status == JobStatus.Completed &&
expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0) expiratime.CompareTo(x.LastRunTime ?? x.EndTime ?? x.CreationTime) <= 0)
.Take(maxResultCount); .Take(maxResultCount);

Loading…
Cancel
Save