Browse Source

Use cloud events.

pull/14164/head
maliming 3 years ago
parent
commit
d7204df550
No known key found for this signature in database GPG Key ID: 96224957E51C89E
  1. 4
      docs/en/Dapr/Index.md
  2. 4
      docs/zh-Hans/Dapr/Index.md
  3. 25
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs
  4. 39
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs
  5. 44
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Json/AbpDaprSubscriptionRequestConverter.cs
  6. 33
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Json/AbpDaprSubscriptionRequestConverterFactory.cs
  7. 11
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Json/AbpDaprSubscriptionRequestJsonNamingPolicy.cs
  8. 11
      framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Models/AbpDaprSubscriptionRequest.cs

4
docs/en/Dapr/Index.md

@ -281,7 +281,7 @@ public class MyController : AbpController
{ {
[HttpPost("/stock-changed")] [HttpPost("/stock-changed")]
[Topic("pubsub", "StockChanged")] [Topic("pubsub", "StockChanged")]
public async Task<IActionResult> TestRouteAsync([FromBody] AbpDaprSubscriptionRequest<StockCountChangedEto> model) public async Task<IActionResult> TestRouteAsync([FromBody] StockCountChangedEto model)
{ {
HttpContext.ValidateDaprAppApiToken(); HttpContext.ValidateDaprAppApiToken();
@ -411,7 +411,7 @@ public class MyController : AbpController
{ {
[HttpPost("/stock-changed")] [HttpPost("/stock-changed")]
[Topic("pubsub", "StockChanged")] [Topic("pubsub", "StockChanged")]
public async Task<IActionResult> TestRouteAsync([FromBody] AbpDaprSubscriptionRequest<StockCountChangedEto> model) public async Task<IActionResult> TestRouteAsync([FromBody] StockCountChangedEto model)
{ {
// Validate the App API token! // Validate the App API token!
HttpContext.ValidateDaprAppApiToken(); HttpContext.ValidateDaprAppApiToken();

4
docs/zh-Hans/Dapr/Index.md

@ -281,7 +281,7 @@ public class MyController : AbpController
{ {
[HttpPost("/stock-changed")] [HttpPost("/stock-changed")]
[Topic("pubsub", "StockChanged")] [Topic("pubsub", "StockChanged")]
public async Task<IActionResult> TestRouteAsync([FromBody] AbpDaprSubscriptionRequest<StockCountChangedEto> model) public async Task<IActionResult> TestRouteAsync([FromBody] StockCountChangedEto model)
{ {
HttpContext.ValidateDaprAppApiToken(); HttpContext.ValidateDaprAppApiToken();
@ -411,7 +411,7 @@ public class MyController : AbpController
{ {
[HttpPost("/stock-changed")] [HttpPost("/stock-changed")]
[Topic("pubsub", "StockChanged")] [Topic("pubsub", "StockChanged")]
public async Task<IActionResult> TestRouteAsync([FromBody] AbpDaprSubscriptionRequest<StockCountChangedEto> model) public async Task<IActionResult> TestRouteAsync([FromBody] StockCountChangedEto model)
{ {
// Validate the App API token! // Validate the App API token!
HttpContext.ValidateDaprAppApiToken(); HttpContext.ValidateDaprAppApiToken();

25
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/AbpAspNetCoreMvcDaprEventBusModule.cs

@ -1,14 +1,10 @@
using System; using System.Linq;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapr; using Dapr;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Routing; using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Json;
using Volo.Abp.Dapr;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Dapr; using Volo.Abp.EventBus.Dapr;
@ -25,12 +21,6 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
context.Services.AddOptions<JsonOptions>()
.Configure<IServiceProvider>((options, serviceProvider) =>
{
options.JsonSerializerOptions.Converters.Add(new AbpDaprSubscriptionRequestConverterFactory(serviceProvider.GetRequiredService<IDaprSerializer>()));
});
var subscribeOptions = context.Services.ExecutePreConfiguredActions<AbpSubscribeOptions>(); var subscribeOptions = context.Services.ExecutePreConfiguredActions<AbpSubscribeOptions>();
Configure<AbpEndpointRouterOptions>(options => Configure<AbpEndpointRouterOptions>(options =>
@ -54,12 +44,19 @@ public class AbpAspNetCoreMvcDaprEventBusModule : AbpModule
continue; continue;
} }
subscriptions.Add(new AbpSubscription() var subscription = new AbpSubscription
{ {
PubsubName = daprEventBusOptions.PubSubName, PubsubName = daprEventBusOptions.PubSubName,
Topic = eventName, Topic = eventName,
Route = AbpAspNetCoreMvcDaprPubSubConsts.DaprEventCallbackUrl Route = AbpAspNetCoreMvcDaprPubSubConsts.DaprEventCallbackUrl,
}); Metadata = new AbpMetadata
{
{
AbpMetadata.RawPayload, "true"
}
}
};
subscriptions.Add(subscription);
} }
} }

39
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Controllers/AbpAspNetCoreMvcDaprEventsController.cs

@ -1,11 +1,9 @@
using System.Collections.Concurrent; using System;
using System.Text.Json; using System.Text.Json;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Json;
using Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Models;
using Volo.Abp.Dapr; using Volo.Abp.Dapr;
using Volo.Abp.EventBus.Dapr; using Volo.Abp.EventBus.Dapr;
@ -21,31 +19,20 @@ public class AbpAspNetCoreMvcDaprEventsController : AbpController
HttpContext.ValidateDaprAppApiToken(); HttpContext.ValidateDaprAppApiToken();
var daprSerializer = HttpContext.RequestServices.GetRequiredService<IDaprSerializer>(); var daprSerializer = HttpContext.RequestServices.GetRequiredService<IDaprSerializer>();
var request = (await JsonDocument.ParseAsync(HttpContext.Request.Body)).Deserialize<AbpDaprSubscriptionRequest<object>>(CreateJsonSerializerOptions(daprSerializer)); var body = (await JsonDocument.ParseAsync(HttpContext.Request.Body));
if (request != null && request.Data is JsonElement jsonElement)
var pubSubName = body.RootElement.GetProperty("pubsubname").GetString();
var topic = body.RootElement.GetProperty("topic").GetString();
var data = body.RootElement.GetProperty("data").GetRawText();
if (pubSubName.IsNullOrWhiteSpace() || topic.IsNullOrWhiteSpace() || data.IsNullOrWhiteSpace())
{ {
var distributedEventBus = HttpContext.RequestServices.GetRequiredService<DaprDistributedEventBus>(); Logger.LogError("Invalid Dapr event request.");
var eventData = daprSerializer.Deserialize(jsonElement.GetRawText(), distributedEventBus.GetEventType(request.Topic)); return BadRequest();
await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(request.Topic), eventData);
return Ok();
} }
Logger.LogError("Invalid Dapr event request."); var distributedEventBus = HttpContext.RequestServices.GetRequiredService<DaprDistributedEventBus>();
return BadRequest(); var eventData = daprSerializer.Deserialize(data, distributedEventBus.GetEventType(topic));
} await distributedEventBus.TriggerHandlersAsync(distributedEventBus.GetEventType(topic), eventData);
return Ok();
private static readonly ConcurrentDictionary<string, JsonSerializerOptions> JsonSerializerOptionsCache = new ConcurrentDictionary<string, JsonSerializerOptions>();
protected virtual JsonSerializerOptions CreateJsonSerializerOptions(IDaprSerializer daprSerializer)
{
return JsonSerializerOptionsCache.GetOrAdd(nameof(AbpAspNetCoreMvcDaprEventsController), _ =>
{
var settings = new JsonSerializerOptions(JsonSerializerDefaults.Web)
{
PropertyNamingPolicy = new AbpDaprSubscriptionRequestJsonNamingPolicy()
};
settings.Converters.Add(new AbpDaprSubscriptionRequestConverterFactory(daprSerializer));
return settings;
});
} }
} }

44
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Json/AbpDaprSubscriptionRequestConverter.cs

@ -1,44 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
using Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Models;
using Volo.Abp.Dapr;
namespace Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Json;
public class AbpDaprSubscriptionRequestConverter<T> : JsonConverter<AbpDaprSubscriptionRequest<T>>
where T : class
{
private JsonSerializerOptions _readJsonSerializerOptions;
private readonly IDaprSerializer _daprSerializer;
public AbpDaprSubscriptionRequestConverter(IDaprSerializer daprSerializer)
{
_daprSerializer = daprSerializer;
}
public override AbpDaprSubscriptionRequest<T> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
_readJsonSerializerOptions ??= CreateJsonSerializerOptions(options);
var rootElement = JsonDocument.ParseValue(ref reader).RootElement;
var obj = JsonSerializer.Deserialize<AbpDaprSubscriptionRequest<T>>(rootElement.GetRawText(), _readJsonSerializerOptions);
obj.Data = _daprSerializer.Deserialize(rootElement.GetProperty("data").GetRawText(), typeof(T)).As<T>();
return obj;
}
public override void Write(Utf8JsonWriter writer, AbpDaprSubscriptionRequest<T> value, JsonSerializerOptions options)
{
throw new NotSupportedException();
}
private JsonSerializerOptions CreateJsonSerializerOptions(JsonSerializerOptions options)
{
var newOptions = new JsonSerializerOptions(options);
newOptions.Converters.RemoveAll(x => x == this || x.GetType() == typeof(AbpDaprSubscriptionRequestConverterFactory));
newOptions.PropertyNamingPolicy = new AbpDaprSubscriptionRequestJsonNamingPolicy();
return newOptions;
}
}

33
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Json/AbpDaprSubscriptionRequestConverterFactory.cs

@ -1,33 +0,0 @@
using System;
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
using Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Models;
using Volo.Abp.Dapr;
namespace Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Json;
public class AbpDaprSubscriptionRequestConverterFactory : JsonConverterFactory
{
private readonly IDaprSerializer _daprSerializer;
public AbpDaprSubscriptionRequestConverterFactory(IDaprSerializer daprSerializer)
{
_daprSerializer = daprSerializer;
}
public override bool CanConvert(Type typeToConvert)
{
return typeToConvert.GetGenericTypeDefinition() == typeof(AbpDaprSubscriptionRequest<>);
}
public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options)
{
return (JsonConverter)Activator.CreateInstance(
typeof(AbpDaprSubscriptionRequestConverter<>).MakeGenericType(typeToConvert.GetGenericArguments()[0]),
BindingFlags.Instance | BindingFlags.Public,
binder: null,
new object[] { _daprSerializer },
culture: null)!;
}
}

11
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Json/AbpDaprSubscriptionRequestJsonNamingPolicy.cs

@ -1,11 +0,0 @@
using System.Text.Json;
namespace Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Json;
public class AbpDaprSubscriptionRequestJsonNamingPolicy : JsonNamingPolicy
{
public override string ConvertName(string name)
{
return name.ToLower();
}
}

11
framework/src/Volo.Abp.AspNetCore.Mvc.Dapr.EventBus/Volo/Abp/AspNetCore/Mvc/Dapr/EventBus/Models/AbpDaprSubscriptionRequest.cs

@ -1,11 +0,0 @@
namespace Volo.Abp.AspNetCore.Mvc.Dapr.EventBus.Models;
public class AbpDaprSubscriptionRequest<T>
where T : class
{
public string PubSubName { get; set; }
public string Topic { get; set; }
public T Data { get; set; }
}
Loading…
Cancel
Save