Browse Source

Implement the ICacheSupportsMultipleItems for the AbpRedisCache

pull/4500/head
liangshiwei 6 years ago
parent
commit
0137824361
  1. 496
      framework/src/Volo.Abp.Caching.StackExchangeRedis/Microsoft/Extensions/Caching/StackExchangeRedis/RedisCache.cs
  2. 70
      framework/src/Volo.Abp.Caching.StackExchangeRedis/Microsoft/Extensions/Caching/StackExchangeRedis/RedisExtensions.cs
  3. 187
      framework/src/Volo.Abp.Caching.StackExchangeRedis/Volo/Abp/Caching/StackExchangeRedis/AbpRedisCache.cs
  4. 42
      framework/src/Volo.Abp.Caching.StackExchangeRedis/Volo/Abp/Caching/StackExchangeRedis/AbpRedisExtensions.cs

496
framework/src/Volo.Abp.Caching.StackExchangeRedis/Microsoft/Extensions/Caching/StackExchangeRedis/RedisCache.cs

@ -0,0 +1,496 @@
// This software is part of the DOTNET extensions
// Copyright (c) .NET Foundation and Contributors
// https://dotnet.microsoft.com/
//
// All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace Microsoft.Extensions.Caching.StackExchangeRedis
{
public class RedisCache : IDistributedCache, IDisposable
{
// KEYS[1] = = key
// ARGV[1] = absolute-expiration - ticks as long (-1 for none)
// ARGV[2] = sliding-expiration - ticks as long (-1 for none)
// ARGV[3] = relative-expiration (long, in seconds, -1 for none) - Min(absolute-expiration - Now, sliding-expiration)
// ARGV[4] = data - byte[]
// this order should not change LUA script depends on it
protected const string SetScript = (@"
redis.call('HMSET', KEYS[1], 'absexp', ARGV[1], 'sldexp', ARGV[2], 'data', ARGV[4])
if ARGV[3] ~= '-1' then
redis.call('EXPIRE', KEYS[1], ARGV[3])
end
return 1");
protected const string AbsoluteExpirationKey = "absexp";
protected const string SlidingExpirationKey = "sldexp";
protected const string DataKey = "data";
protected const long NotPresent = -1;
protected volatile ConnectionMultiplexer Connection;
protected IDatabase Cache;
protected readonly RedisCacheOptions Options;
protected readonly string Instance;
protected readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
public RedisCache(IOptions<RedisCacheOptions> optionsAccessor)
{
if (optionsAccessor == null)
{
throw new ArgumentNullException(nameof(optionsAccessor));
}
Options = optionsAccessor.Value;
// This allows partitioning a single backend cache for use with multiple apps/services.
Instance = Options.InstanceName ?? string.Empty;
}
public virtual byte[] Get(string key)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
return GetAndRefresh(key, getData: true);
}
public virtual async Task<byte[]> GetAsync(
string key,
CancellationToken token = default)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
return await GetAndRefreshAsync(key, getData: true, token: token);
}
public virtual void Set(
string key,
byte[] value,
DistributedCacheEntryOptions options)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
Connect();
var creationTime = DateTimeOffset.UtcNow;
var absoluteExpiration = GetAbsoluteExpiration(creationTime, options);
Cache.ScriptEvaluate(SetScript, new RedisKey[] {Instance + key},
new RedisValue[]
{
absoluteExpiration?.Ticks ?? NotPresent,
options.SlidingExpiration?.Ticks ?? NotPresent,
GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent,
value
});
}
public virtual async Task SetAsync(
string key,
byte[] value,
DistributedCacheEntryOptions options,
CancellationToken token = default)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
token.ThrowIfCancellationRequested();
await ConnectAsync(token);
var creationTime = DateTimeOffset.UtcNow;
var absoluteExpiration = GetAbsoluteExpiration(creationTime, options);
await Cache.ScriptEvaluateAsync(SetScript, new RedisKey[] {Instance + key},
new RedisValue[]
{
absoluteExpiration?.Ticks ?? NotPresent,
options.SlidingExpiration?.Ticks ?? NotPresent,
GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent,
value
});
}
public virtual void Refresh(string key)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
GetAndRefresh(key, getData: false);
}
public virtual async Task RefreshAsync(
string key,
CancellationToken token = default)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
await GetAndRefreshAsync(key, getData: false, token: token);
}
protected virtual void Connect()
{
if (Cache != null)
{
return;
}
ConnectionLock.Wait();
try
{
if (Cache == null)
{
if (Options.ConfigurationOptions != null)
{
Connection = ConnectionMultiplexer.Connect(Options.ConfigurationOptions);
}
else
{
Connection = ConnectionMultiplexer.Connect(Options.Configuration);
}
Cache = Connection.GetDatabase();
}
}
finally
{
ConnectionLock.Release();
}
}
protected virtual async Task ConnectAsync(CancellationToken token = default)
{
token.ThrowIfCancellationRequested();
if (Cache != null)
{
return;
}
await ConnectionLock.WaitAsync(token);
try
{
if (Cache == null)
{
if (Options.ConfigurationOptions != null)
{
Connection = await ConnectionMultiplexer.ConnectAsync(Options.ConfigurationOptions);
}
else
{
Connection = await ConnectionMultiplexer.ConnectAsync(Options.Configuration);
}
Cache = Connection.GetDatabase();
}
}
finally
{
ConnectionLock.Release();
}
}
protected virtual byte[] GetAndRefresh(string key, bool getData)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
Connect();
// This also resets the LRU status as desired.
// TODO: Can this be done in one operation on the server side? Probably, the trick would just be the DateTimeOffset math.
RedisValue[] results;
if (getData)
{
results = Cache.HashMemberGet(Instance + key, AbsoluteExpirationKey, SlidingExpirationKey, DataKey);
}
else
{
results = Cache.HashMemberGet(Instance + key, AbsoluteExpirationKey, SlidingExpirationKey);
}
// TODO: Error handling
if (results.Length >= 2)
{
MapMetadata(results, out DateTimeOffset? absExpr, out TimeSpan? sldExpr);
Refresh(key, absExpr, sldExpr);
}
if (results.Length >= 3 && results[2].HasValue)
{
return results[2];
}
return null;
}
protected virtual async Task<byte[]> GetAndRefreshAsync(
string key,
bool getData,
CancellationToken token = default)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
await ConnectAsync(token);
// This also resets the LRU status as desired.
// TODO: Can this be done in one operation on the server side? Probably, the trick would just be the DateTimeOffset math.
RedisValue[] results;
if (getData)
{
results = await Cache.HashMemberGetAsync(Instance + key, AbsoluteExpirationKey, SlidingExpirationKey,
DataKey);
}
else
{
results = await Cache.HashMemberGetAsync(Instance + key, AbsoluteExpirationKey, SlidingExpirationKey);
}
// TODO: Error handling
if (results.Length >= 2)
{
MapMetadata(results, out DateTimeOffset? absExpr, out TimeSpan? sldExpr);
await RefreshAsync(key, absExpr, sldExpr, token);
}
if (results.Length >= 3 && results[2].HasValue)
{
return results[2];
}
return null;
}
public virtual void Remove(string key)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
Connect();
Cache.KeyDelete(Instance + key);
// TODO: Error handling
}
public virtual async Task RemoveAsync(
string key,
CancellationToken token = default)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
await ConnectAsync(token);
await Cache.KeyDeleteAsync(Instance + key);
// TODO: Error handling
}
protected virtual void MapMetadata(
RedisValue[] results,
out DateTimeOffset? absoluteExpiration,
out TimeSpan? slidingExpiration)
{
absoluteExpiration = null;
slidingExpiration = null;
var absoluteExpirationTicks = (long?) results[0];
if (absoluteExpirationTicks.HasValue && absoluteExpirationTicks.Value != NotPresent)
{
absoluteExpiration = new DateTimeOffset(absoluteExpirationTicks.Value, TimeSpan.Zero);
}
var slidingExpirationTicks = (long?) results[1];
if (slidingExpirationTicks.HasValue && slidingExpirationTicks.Value != NotPresent)
{
slidingExpiration = new TimeSpan(slidingExpirationTicks.Value);
}
}
protected virtual void Refresh(
string key,
DateTimeOffset? absExpr,
TimeSpan? sldExpr)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
// Note Refresh has no effect if there is just an absolute expiration (or neither).
TimeSpan? expr = null;
if (sldExpr.HasValue)
{
if (absExpr.HasValue)
{
var relExpr = absExpr.Value - DateTimeOffset.Now;
expr = relExpr <= sldExpr.Value ? relExpr : sldExpr;
}
else
{
expr = sldExpr;
}
Cache.KeyExpire(Instance + key, expr);
// TODO: Error handling
}
}
protected virtual async Task RefreshAsync(
string key,
DateTimeOffset? absExpr,
TimeSpan? sldExpr,
CancellationToken token = default)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
// Note Refresh has no effect if there is just an absolute expiration (or neither).
TimeSpan? expr = null;
if (sldExpr.HasValue)
{
if (absExpr.HasValue)
{
var relExpr = absExpr.Value - DateTimeOffset.Now;
expr = relExpr <= sldExpr.Value ? relExpr : sldExpr;
}
else
{
expr = sldExpr;
}
await Cache.KeyExpireAsync(Instance + key, expr);
// TODO: Error handling
}
}
protected static long? GetExpirationInSeconds(
DateTimeOffset creationTime,
DateTimeOffset? absoluteExpiration,
DistributedCacheEntryOptions options)
{
if (absoluteExpiration.HasValue && options.SlidingExpiration.HasValue)
{
return (long) Math.Min(
(absoluteExpiration.Value - creationTime).TotalSeconds,
options.SlidingExpiration.Value.TotalSeconds);
}
else if (absoluteExpiration.HasValue)
{
return (long) (absoluteExpiration.Value - creationTime).TotalSeconds;
}
else if (options.SlidingExpiration.HasValue)
{
return (long) options.SlidingExpiration.Value.TotalSeconds;
}
return null;
}
protected static DateTimeOffset? GetAbsoluteExpiration(
DateTimeOffset creationTime,
DistributedCacheEntryOptions options)
{
if (options.AbsoluteExpiration.HasValue && options.AbsoluteExpiration <= creationTime)
{
throw new ArgumentOutOfRangeException(
nameof(DistributedCacheEntryOptions.AbsoluteExpiration),
options.AbsoluteExpiration.Value,
"The absolute expiration value must be in the future.");
}
var absoluteExpiration = options.AbsoluteExpiration;
if (options.AbsoluteExpirationRelativeToNow.HasValue)
{
absoluteExpiration = creationTime + options.AbsoluteExpirationRelativeToNow;
}
return absoluteExpiration;
}
public virtual void Dispose()
{
Connection?.Close();
}
}
}

70
framework/src/Volo.Abp.Caching.StackExchangeRedis/Microsoft/Extensions/Caching/StackExchangeRedis/RedisExtensions.cs

@ -0,0 +1,70 @@
// This software is part of the DOTNET extensions
// Copyright (c) .NET Foundation and Contributors
// https://dotnet.microsoft.com/
//
// All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Microsoft.Extensions.Caching.StackExchangeRedis
{
internal static class RedisExtensions
{
private const string HmGetScript = (@"return redis.call('HMGET', KEYS[1], unpack(ARGV))");
internal static RedisValue[] HashMemberGet(this IDatabase cache, string key, params string[] members)
{
var result = cache.ScriptEvaluate(
HmGetScript,
new RedisKey[] { key },
GetRedisMembers(members));
// TODO: Error checking?
return (RedisValue[])result;
}
internal static async Task<RedisValue[]> HashMemberGetAsync(
this IDatabase cache,
string key,
params string[] members)
{
var result = await cache.ScriptEvaluateAsync(
HmGetScript,
new RedisKey[] { key },
GetRedisMembers(members)).ConfigureAwait(false);
// TODO: Error checking?
return (RedisValue[])result;
}
private static RedisValue[] GetRedisMembers(params string[] members)
{
var redisMembers = new RedisValue[members.Length];
for (int i = 0; i < members.Length; i++)
{
redisMembers[i] = (RedisValue)members[i];
}
return redisMembers;
}
}
}

187
framework/src/Volo.Abp.Caching.StackExchangeRedis/Volo/Abp/Caching/StackExchangeRedis/AbpRedisCache.cs

@ -1,7 +1,9 @@
using System;
using System.Reflection;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Caching.StackExchangeRedis;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
@ -10,40 +12,179 @@ using Volo.Abp.DependencyInjection;
namespace Volo.Abp.Caching.StackExchangeRedis
{
[DisableConventionalRegistration]
public class AbpRedisCache : RedisCache
public class AbpRedisCache : RedisCache, ICacheSupportsMultipleItems
{
private static readonly FieldInfo RedisDatabaseField;
private static readonly MethodInfo ConnectMethod;
private static readonly MethodInfo ConnectAsyncMethod;
public AbpRedisCache(IOptions<RedisCacheOptions> optionsAccessor)
: base(optionsAccessor)
{
}
protected IDatabase RedisDatabase => RedisDatabaseField.GetValue(this) as IDatabase;
public byte[][] GetMany(
IEnumerable<string> keys)
{
keys = Check.NotNull(keys, nameof(keys));
static AbpRedisCache()
return GetAndRefreshMany(keys, true);
}
public async Task<byte[][]> GetManyAsync(
IEnumerable<string> keys,
CancellationToken token = default)
{
RedisDatabaseField = typeof(RedisCache)
.GetField("_cache", BindingFlags.Instance | BindingFlags.NonPublic);
ConnectMethod = typeof(RedisCache)
.GetMethod("Connect", BindingFlags.Instance | BindingFlags.NonPublic);
ConnectAsyncMethod = typeof(RedisCache)
.GetMethod("ConnectAsync", BindingFlags.Instance | BindingFlags.NonPublic);
keys = Check.NotNull(keys, nameof(keys));
return await GetAndRefreshManyAsync(keys, true, token);
}
public AbpRedisCache(IOptions<RedisCacheOptions> optionsAccessor)
: base(optionsAccessor)
public void SetMany(
IEnumerable<KeyValuePair<string, byte[]>> items,
DistributedCacheEntryOptions options)
{
Connect();
Task.WaitAll(PipelineSetMany(items, options));
}
public async Task SetManyAsync(
IEnumerable<KeyValuePair<string, byte[]>> items,
DistributedCacheEntryOptions options,
CancellationToken token = default)
{
token.ThrowIfCancellationRequested();
await ConnectAsync(token);
await Task.WhenAll(PipelineSetMany(items, options));
}
protected virtual byte[][] GetAndRefreshMany(
IEnumerable<string> keys,
bool getData)
{
Connect();
var keyArray = keys.Select(key => Instance + key).ToArray();
RedisValue[][] results;
if (getData)
{
results = Cache.HashMemberGetMany(keyArray, AbsoluteExpirationKey,
SlidingExpirationKey, DataKey);
}
else
{
results = Cache.HashMemberGetMany(keyArray, AbsoluteExpirationKey,
SlidingExpirationKey);
}
Task.WaitAll(GetAndRefreshMany(keyArray, results, out var bytes));
return bytes;
}
protected virtual async Task<byte[][]> GetAndRefreshManyAsync(
IEnumerable<string> keys,
bool getData,
CancellationToken token = default)
{
token.ThrowIfCancellationRequested();
await ConnectAsync(token);
var keyArray = keys.Select(key => Instance + key).ToArray();
RedisValue[][] results;
if (getData)
{
results = await Cache.HashMemberGetManyAsync(keyArray, AbsoluteExpirationKey,
SlidingExpirationKey, DataKey);
}
else
{
results = await Cache.HashMemberGetManyAsync(keyArray, AbsoluteExpirationKey,
SlidingExpirationKey);
}
await Task.WhenAll(GetAndRefreshMany(keyArray, results, out var bytes));
return bytes;
}
protected virtual void Connect()
private Task[] GetAndRefreshMany(string[] keys, RedisValue[][] results, out byte[][] bytes)
{
ConnectMethod.Invoke(this, Array.Empty<object>());
bytes = new byte[keys.Length][];
var tasks = new Task[keys.Length];
for (var i = 0; i < keys.Length; i++)
{
if (results[i].Length >= 2)
{
MapMetadata(results[i], out DateTimeOffset? absExpr, out TimeSpan? sldExpr);
tasks[i] = PipelineRefresh(keys[i], absExpr, sldExpr);
}
if (results[i].Length >= 3 && results[i][2].HasValue)
{
bytes[i] = results[i][2];
}
else
{
bytes[i] = null;
}
}
return tasks;
}
private Task PipelineRefresh(
string key,
DateTimeOffset? absExpr,
TimeSpan? sldExpr)
{
if (sldExpr.HasValue)
{
TimeSpan? expr;
if (absExpr.HasValue)
{
var relExpr = absExpr.Value - DateTimeOffset.Now;
expr = relExpr <= sldExpr.Value ? relExpr : sldExpr;
}
else
{
expr = sldExpr;
}
return Cache.KeyExpireAsync(key, expr);
}
return Task.CompletedTask;
}
protected virtual Task ConnectAsync(CancellationToken token = default)
private Task[] PipelineSetMany(
IEnumerable<KeyValuePair<string, byte[]>> items,
DistributedCacheEntryOptions options)
{
return (Task) ConnectAsyncMethod.Invoke(this, new object[] {token});
items = Check.NotNull(items, nameof(items));
options = Check.NotNull(options, nameof(options));
var itemArray = items.ToArray();
var tasks = new Task[itemArray.Length];
var creationTime = DateTimeOffset.UtcNow;
var absoluteExpiration = GetAbsoluteExpiration(creationTime, options);
for (var i = 0; i < itemArray.Length; i++)
{
tasks[i] = Cache.ScriptEvaluateAsync(SetScript, new RedisKey[] {Instance + itemArray[i].Key},
new RedisValue[]
{
absoluteExpiration?.Ticks ?? NotPresent,
options.SlidingExpiration?.Ticks ?? NotPresent,
GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent,
itemArray[i].Value
});
}
return tasks;
}
}
}
}

42
framework/src/Volo.Abp.Caching.StackExchangeRedis/Volo/Abp/Caching/StackExchangeRedis/AbpRedisExtensions.cs

@ -0,0 +1,42 @@
using System.Linq;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace Volo.Abp.Caching.StackExchangeRedis
{
public static class AbpRedisExtensions
{
private const string HmGetManyScript = (@"
local res = {};
for i , key in ipairs(KEYS) do
res[#res+1] = redis.call('HMGET', key,unpack(ARGV))
end
return res");
public static RedisValue[][] HashMemberGetMany(
this IDatabase cache,
string[] keys,
params string[] members)
{
var results = cache.ScriptEvaluate(
HmGetManyScript,
keys.Select(key => (RedisKey) key).ToArray(),
members.Select(member => (RedisValue) member).ToArray());
return ((RedisResult[]) results).Select(result => (RedisValue[]) result).ToArray();
}
public static async Task<RedisValue[][]> HashMemberGetManyAsync(
this IDatabase cache,
string[] keys,
params string[] members)
{
var results = await cache.ScriptEvaluateAsync(
HmGetManyScript,
keys.Select(key => (RedisKey) key).ToArray(),
members.Select(member => (RedisValue) member).ToArray());
return ((RedisResult[]) results).Select(result => (RedisValue[]) result).ToArray();
}
}
}
Loading…
Cancel
Save