Browse Source

Merge pull request #987 from colinin/identity-server-session

Identity server session
pull/988/head
yx lin 1 year ago
committed by GitHub
parent
commit
d9a3d28f1a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      .github/workflows/publish.yml
  2. 4
      .github/workflows/release.yml
  3. 6
      Directory.Packages.props
  4. 6
      apps/vue/src/api/caching-management/cache/index.ts
  5. 7
      apps/vue/src/api/text-templating/contents/index.ts
  6. 33
      apps/vue/src/views/feature-management/definitions/features/components/FeatureDefinitionTable.vue
  7. 30
      apps/vue/src/views/permission-management/definitions/permissions/components/PermissionDefinitionTable.vue
  8. 32
      apps/vue/src/views/permission-management/definitions/permissions/datas/TableData.ts
  9. 26
      apps/vue/src/views/realtime/notifications/definitions/notifications/components/NotificationDefinitionTable.vue
  10. 8
      apps/vue/src/views/realtime/notifications/definitions/notifications/components/NotificationSendModal.vue
  11. 2
      aspnet-core/framework/cli/LINGYUN.Abp.Cli/LINGYUN.Abp.Cli.csproj
  12. 228
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPConsumerServiceSelector.cs
  13. 63
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs
  14. 19
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusOptions.cs
  15. 87
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPExecutionFailedException.cs
  16. 15
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs
  17. 63
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs
  18. 351
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs
  19. 111
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCapSerializer.cs
  20. 431
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs
  21. 199
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CustomDistributedEventSubscriber.cs
  22. 37
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/FailedThresholdCallbackNotifier.cs
  23. 33
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/ICustomDistributedEventSubscriber.cs
  24. 9
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/IFailedThresholdCallbackNotifier.cs
  25. 194
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/AwaitableInfo.cs
  26. 69
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/CoercedAwaitableInfo.cs
  27. 568
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/ObjectMethodExecutor.cs
  28. 171
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/ObjectMethodExecutorAwaitable.cs
  29. 231
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/ObjectMethodExecutorFSharpSupport.cs
  30. 33
      aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs
  31. 7
      aspnet-core/modules/identityServer/LINGYUN.Abp.IdentityServer.Session/LINGYUN/Abp/IdentityServer/Session/AbpIdentityServerSessionModule.cs
  32. 4
      aspnet-core/services/LY.MicroService.AuthServer/appsettings.Development.json
  33. 4
      aspnet-core/services/LY.MicroService.IdentityServer/appsettings.Development.json
  34. 6
      aspnet-core/templates/content/Directory.Packages.props
  35. 4
      aspnet-core/templates/content/common.props
  36. 4
      common.props

2
.github/workflows/publish.yml

@ -2,7 +2,7 @@ name: "Publish"
on: on:
push: push:
branches: [ rel-8.2.0 ] branches: [ rel-8.2.1 ]
env: env:
DOTNET_VERSION: "8.0.200" DOTNET_VERSION: "8.0.200"

4
.github/workflows/release.yml

@ -2,7 +2,7 @@ name: "Tagged Release"
on: on:
push: push:
branches: [ rel-8.2.0 ] branches: [ rel-8.2.1 ]
jobs: jobs:
tagged-release: tagged-release:
@ -14,4 +14,4 @@ jobs:
with: with:
repo_token: "${{ secrets.GITHUB_TOKEN }}" repo_token: "${{ secrets.GITHUB_TOKEN }}"
prerelease: false prerelease: false
automatic_release_tag: "8.2.0" automatic_release_tag: "8.2.1"

6
Directory.Packages.props

@ -1,9 +1,9 @@
<Project> <Project>
<PropertyGroup> <PropertyGroup>
<DotNetCoreCAPPackageVersion>8.1.1</DotNetCoreCAPPackageVersion> <DotNetCoreCAPPackageVersion>8.2.0</DotNetCoreCAPPackageVersion>
<ElsaPackageVersion>2.14.1</ElsaPackageVersion> <ElsaPackageVersion>2.14.1</ElsaPackageVersion>
<VoloAbpPackageVersion>8.2.0</VoloAbpPackageVersion> <VoloAbpPackageVersion>8.2.1</VoloAbpPackageVersion>
<LINGYUNAbpPackageVersion>8.2.0</LINGYUNAbpPackageVersion> <LINGYUNAbpPackageVersion>8.2.1</LINGYUNAbpPackageVersion>
<MicrosoftExtensionsPackageVersion>8.0.0</MicrosoftExtensionsPackageVersion> <MicrosoftExtensionsPackageVersion>8.0.0</MicrosoftExtensionsPackageVersion>
<MicrosoftAspNetCorePackageVersion>8.0.0</MicrosoftAspNetCorePackageVersion> <MicrosoftAspNetCorePackageVersion>8.0.0</MicrosoftAspNetCorePackageVersion>
<MicrosoftEntityFrameworkCorePackageVersion>8.0.0</MicrosoftEntityFrameworkCorePackageVersion> <MicrosoftEntityFrameworkCorePackageVersion>8.0.0</MicrosoftEntityFrameworkCorePackageVersion>

6
apps/vue/src/api/caching-management/cache/index.ts

@ -8,14 +8,14 @@ import {
export const getKeys = (input: GetCacheKeysRequest) => { export const getKeys = (input: GetCacheKeysRequest) => {
return defHttp.get<CacheKeys>({ return defHttp.get<CacheKeys>({
url: '/api/caching-management/cache', url: '/api/caching-management/cache/keys',
params: input, params: input,
}); });
}; };
export const getValue = (key: string) => { export const getValue = (key: string) => {
return defHttp.get<CacheValue>({ return defHttp.get<CacheValue>({
url: `/api/caching-management/cache?key=${key}`, url: `/api/caching-management/cache/value?key=${key}`,
}); });
}; };
@ -28,6 +28,6 @@ export const refresh = (input: CacheRefreshRequest) => {
export const remove = (key: string) => { export const remove = (key: string) => {
return defHttp.delete<void>({ return defHttp.delete<void>({
url: `/api/caching-management/cache?key=${key}`, url: `/api/caching-management/cache/remove?key=${key}`,
}); });
}; };

7
apps/vue/src/api/text-templating/contents/index.ts

@ -7,9 +7,12 @@ import {
} from './model'; } from './model';
export const GetAsyncByInput = (input: TextTemplateContentGetInput) => { export const GetAsyncByInput = (input: TextTemplateContentGetInput) => {
let url = `/api/text-templating/templates/content/${input.name}`;
if (input.culture) {
url = `/api/text-templating/templates/content/${input.culture}/${input.name}`;
}
return defHttp.get<TextTemplateContentDto>({ return defHttp.get<TextTemplateContentDto>({
url: `/api/text-templating/templates/content`, url,
params: input,
}); });
}; };

33
apps/vue/src/views/feature-management/definitions/features/components/FeatureDefinitionTable.vue

@ -17,19 +17,13 @@
</template> </template>
<template #bodyCell="{ column, record }"> <template #bodyCell="{ column, record }">
<template v-if="column.key === 'displayName'"> <template v-if="column.key === 'displayName'">
<span>{{ getGroupDisplayName(record.displayName) }}</span> <span>{{ getDisplayName(record.displayName) }}</span>
</template> </template>
</template> </template>
<template #expandedRowRender="{ record }"> <template #expandedRowRender="{ record }">
<BasicTable @register="registerSubTable" :data-source="record.features"> <BasicTable @register="registerSubTable" :data-source="record.features">
<template #bodyCell="{ column, record }"> <template #bodyCell="{ column, record }">
<template v-if="column.key === 'groupName'"> <template v-if="column.key === 'displayName'">
<span>{{ getGroupDisplayName(record.groupName) }}</span>
</template>
<template v-else-if="column.key === 'parentName'">
<span>{{ getDisplayName(record.parentName) }}</span>
</template>
<template v-else-if="column.key === 'displayName'">
<span>{{ getDisplayName(record.displayName) }}</span> <span>{{ getDisplayName(record.displayName) }}</span>
</template> </template>
<template v-else-if="column.key === 'description'"> <template v-else-if="column.key === 'description'">
@ -92,7 +86,6 @@
import { getList, deleteByName } from '/@/api/feature-management/definitions/features'; import { getList, deleteByName } from '/@/api/feature-management/definitions/features';
import { getSearchFormSchemas } from '../datas/ModalData'; import { getSearchFormSchemas } from '../datas/ModalData';
import { listToTree } from '/@/utils/helper/treeHelper'; import { listToTree } from '/@/utils/helper/treeHelper';
import { groupBy } from '/@/utils/array';
import { sorter } from '/@/utils/table'; import { sorter } from '/@/utils/table';
import FeatureDefinitionModal from './FeatureDefinitionModal.vue'; import FeatureDefinitionModal from './FeatureDefinitionModal.vue';
@ -181,14 +174,6 @@
}; };
}); });
}); });
const getGroupDisplayName = computed(() => {
return (groupName: string) => {
const group = state.groups.find((x) => x.name === groupName);
if (!group) return groupName;
const info = deserialize(group.displayName);
return Lr(info.resourceName, info.name);
};
});
const getDisplayName = computed(() => { const getDisplayName = computed(() => {
return (displayName?: string) => { return (displayName?: string) => {
if (!displayName) return displayName; if (!displayName) return displayName;
@ -198,8 +183,7 @@
}); });
onMounted(() => { onMounted(() => {
fetch(); fetchGroups().then(fetch);
fetchGroups();
}); });
function fetch() { function fetch() {
@ -210,15 +194,14 @@
var input = form.getFieldsValue(); var input = form.getFieldsValue();
getList(input) getList(input)
.then((res) => { .then((res) => {
const featureGroup = groupBy(res.items, 'groupName');
const featureGroupData: FeatureGroup[] = []; const featureGroupData: FeatureGroup[] = [];
Object.keys(featureGroup).forEach((gk) => { state.groups.forEach((group) => {
const groupData: FeatureGroup = { const groupData: FeatureGroup = {
name: gk, name: group.name,
displayName: gk, displayName: group.displayName,
features: [], features: [],
}; };
const featureTree = listToTree(featureGroup[gk], { const featureTree = listToTree(res.items.filter((item) => item.groupName === group.name), {
id: 'name', id: 'name',
pid: 'parentName', pid: 'parentName',
}); });
@ -236,7 +219,7 @@
} }
function fetchGroups() { function fetchGroups() {
getGroupDefinitions({}).then((res) => { return getGroupDefinitions({}).then((res) => {
state.groups = res.items; state.groups = res.items;
}); });
} }

30
apps/vue/src/views/permission-management/definitions/permissions/components/PermissionDefinitionTable.vue

@ -17,16 +17,13 @@
</template> </template>
<template #bodyCell="{ column, record }"> <template #bodyCell="{ column, record }">
<template v-if="column.key === 'displayName'"> <template v-if="column.key === 'displayName'">
<span>{{ getGroupDisplayName(record.displayName) }}</span> <span>{{ getDisplayName(record.displayName) }}</span>
</template> </template>
</template> </template>
<template #expandedRowRender="{ record }"> <template #expandedRowRender="{ record }">
<BasicTable @register="registerSubTable" :data-source="record.permissions"> <BasicTable @register="registerSubTable" :data-source="record.permissions">
<template #bodyCell="{ column, record }"> <template #bodyCell="{ column, record }">
<template v-if="column.key === 'groupName'"> <template v-if="column.key === 'displayName'">
<span>{{ getGroupDisplayName(record.groupName) }}</span>
</template>
<template v-else-if="column.key === 'displayName'">
<span>{{ getDisplayName(record.displayName) }}</span> <span>{{ getDisplayName(record.displayName) }}</span>
</template> </template>
<template v-else-if="column.key === 'multiTenancySide'"> <template v-else-if="column.key === 'multiTenancySide'">
@ -94,7 +91,6 @@
import { multiTenancySidesMap, providersMap } from '../../typing'; import { multiTenancySidesMap, providersMap } from '../../typing';
import { getSearchFormSchemas } from '../datas/ModalData'; import { getSearchFormSchemas } from '../datas/ModalData';
import { listToTree } from '/@/utils/helper/treeHelper'; import { listToTree } from '/@/utils/helper/treeHelper';
import { groupBy } from '/@/utils/array';
import { sorter } from '/@/utils/table'; import { sorter } from '/@/utils/table';
import PermissionDefinitionModal from './PermissionDefinitionModal.vue'; import PermissionDefinitionModal from './PermissionDefinitionModal.vue';
@ -180,14 +176,6 @@
}; };
}); });
}); });
const getGroupDisplayName = computed(() => {
return (groupName: string) => {
const group = state.groups.find((x) => x.name === groupName);
if (!group) return groupName;
const info = deserialize(group.displayName);
return Lr(info.resourceName, info.name);
};
});
const getDisplayName = computed(() => { const getDisplayName = computed(() => {
return (displayName?: string) => { return (displayName?: string) => {
if (!displayName) return displayName; if (!displayName) return displayName;
@ -197,8 +185,7 @@
}); });
onMounted(() => { onMounted(() => {
fetch(); fetchGroups().then(fetch);
fetchGroups();
}); });
function fetch() { function fetch() {
@ -209,15 +196,14 @@
var input = form.getFieldsValue(); var input = form.getFieldsValue();
GetListAsyncByInput(input) GetListAsyncByInput(input)
.then((res) => { .then((res) => {
const permissionGroup = groupBy(res.items, 'groupName');
const permissionGroupData: PermissionGroup[] = []; const permissionGroupData: PermissionGroup[] = [];
Object.keys(permissionGroup).forEach((gk) => { state.groups.forEach((group) => {
const groupData: PermissionGroup = { const groupData: PermissionGroup = {
name: gk, name: group.name,
displayName: gk, displayName: group.displayName,
permissions: [], permissions: [],
}; };
const permissionTree = listToTree(permissionGroup[gk], { const permissionTree = listToTree(res.items.filter((item) => item.groupName === group.name), {
id: 'name', id: 'name',
pid: 'parentName', pid: 'parentName',
}); });
@ -235,7 +221,7 @@
} }
function fetchGroups() { function fetchGroups() {
getGroupDefinitions({}).then((res) => { return getGroupDefinitions({}).then((res) => {
state.groups = res.items; state.groups = res.items;
}); });
} }

32
apps/vue/src/views/permission-management/definitions/permissions/datas/TableData.ts

@ -15,22 +15,6 @@ export function getDataColumns(): BasicColumn[] {
defaultHidden: true, defaultHidden: true,
sorter: (a, b) => sorter(a, b, 'isEnabled'), sorter: (a, b) => sorter(a, b, 'isEnabled'),
}, },
{
title: L('DisplayName:MultiTenancySide'),
dataIndex: 'multiTenancySide',
align: 'left',
width: 80,
resizable: true,
sorter: (a, b) => sorter(a, b, 'multiTenancySide'),
},
{
title: L('DisplayName:Providers'),
dataIndex: 'providers',
align: 'left',
width: 80,
resizable: true,
sorter: (a, b) => sorter(a, b, 'providers'),
},
{ {
title: L('DisplayName:Name'), title: L('DisplayName:Name'),
dataIndex: 'name', dataIndex: 'name',
@ -47,6 +31,22 @@ export function getDataColumns(): BasicColumn[] {
resizable: true, resizable: true,
sorter: (a, b) => sorter(a, b, 'displayName'), sorter: (a, b) => sorter(a, b, 'displayName'),
}, },
{
title: L('DisplayName:MultiTenancySide'),
dataIndex: 'multiTenancySide',
align: 'left',
width: 80,
resizable: true,
sorter: (a, b) => sorter(a, b, 'multiTenancySide'),
},
{
title: L('DisplayName:Providers'),
dataIndex: 'providers',
align: 'left',
width: 80,
resizable: true,
sorter: (a, b) => sorter(a, b, 'providers'),
},
{ {
title: L('DisplayName:IsStatic'), title: L('DisplayName:IsStatic'),
dataIndex: 'isStatic', dataIndex: 'isStatic',

26
apps/vue/src/views/realtime/notifications/definitions/notifications/components/NotificationDefinitionTable.vue

@ -33,7 +33,7 @@
</template> </template>
<template #bodyCell="{ column, record }"> <template #bodyCell="{ column, record }">
<template v-if="column.key === 'displayName'"> <template v-if="column.key === 'displayName'">
<span>{{ getGroupDisplayName(record.displayName) }}</span> <span>{{ getDisplayName(record.displayName) }}</span>
</template> </template>
</template> </template>
<template #expandedRowRender="{ record }"> <template #expandedRowRender="{ record }">
@ -123,7 +123,6 @@
} from '/@/api/realtime/notifications/definitions/notifications'; } from '/@/api/realtime/notifications/definitions/notifications';
import { NotificationDefinitionDto } from '/@/api/realtime/notifications/definitions/notifications/model'; import { NotificationDefinitionDto } from '/@/api/realtime/notifications/definitions/notifications/model';
import { getSearchFormSchemas } from '../datas/ModalData'; import { getSearchFormSchemas } from '../datas/ModalData';
import { groupBy } from '/@/utils/array';
import { sorter } from '/@/utils/table'; import { sorter } from '/@/utils/table';
import NotificationDefinitionModal from './NotificationDefinitionModal.vue'; import NotificationDefinitionModal from './NotificationDefinitionModal.vue';
import NotificationSendModal from './NotificationSendModal.vue'; import NotificationSendModal from './NotificationSendModal.vue';
@ -209,14 +208,6 @@
}; };
}); });
}); });
const getGroupDisplayName = computed(() => {
return (groupName: string) => {
const group = state.groups.find((x) => x.name === groupName);
if (!group) return groupName;
const info = deserialize(group.displayName);
return Lr(info.resourceName, info.name);
};
});
const getDisplayName = computed(() => { const getDisplayName = computed(() => {
return (displayName?: string) => { return (displayName?: string) => {
if (!displayName) return displayName; if (!displayName) return displayName;
@ -226,8 +217,7 @@
}); });
onMounted(() => { onMounted(() => {
fetch(); fetchGroups().then(fetch);
fetchGroups();
}); });
function fetch() { function fetch() {
@ -238,18 +228,16 @@
var input = form.getFieldsValue(); var input = form.getFieldsValue();
GetListAsyncByInput(input) GetListAsyncByInput(input)
.then((res) => { .then((res) => {
const definitionGroup = groupBy(res.items, 'groupName');
const definitionGroupData: NotificationGroup[] = []; const definitionGroupData: NotificationGroup[] = [];
Object.keys(definitionGroup).forEach((gk) => { state.groups.forEach((group) => {
const groupData: NotificationGroup = { const groupData: NotificationGroup = {
name: gk, name: group.name,
displayName: gk, displayName: group.displayName,
notifications: [], notifications: [],
}; };
groupData.notifications.push(...definitionGroup[gk]); groupData.notifications.push(...res.items.filter((item) => item.groupName === group.name));
definitionGroupData.push(groupData); definitionGroupData.push(groupData);
}); });
console.log(definitionGroupData);
setTableData(definitionGroupData); setTableData(definitionGroupData);
}) })
.finally(() => { .finally(() => {
@ -259,7 +247,7 @@
} }
function fetchGroups() { function fetchGroups() {
getGroupDefinitions({}).then((res) => { return getGroupDefinitions({}).then((res) => {
state.groups = res.items; state.groups = res.items;
}); });
} }

8
apps/vue/src/views/realtime/notifications/definitions/notifications/components/NotificationSendModal.vue

@ -267,12 +267,19 @@
const formEl = unref(formRef); const formEl = unref(formRef);
formEl?.validate().then(() => { formEl?.validate().then(() => {
let input: NotificationSendDto; let input: NotificationSendDto;
let toUsers: { userId: string }[] = [];
if (state.entity.toUsers && Array.isArray(state.entity.toUsers)) {
toUsers = state.entity.toUsers.map((id) => {
return { userId: id };
});
}
if (getIsTemplate.value) { if (getIsTemplate.value) {
input = { input = {
name: state.notification!.template!, name: state.notification!.template!,
severity: state.entity.severity, severity: state.entity.severity,
culture: state.entity.culture, culture: state.entity.culture,
data: state.entity.data, data: state.entity.data,
toUsers: toUsers,
}; };
changeOkLoading(true); changeOkLoading(true);
sendTemplate(input) sendTemplate(input)
@ -320,6 +327,7 @@
formUser: getApplication.currentUser.userName, formUser: getApplication.currentUser.userName,
createTime: formatToDateTime(new Date()), createTime: formatToDateTime(new Date()),
}, },
toUsers: toUsers,
}; };
changeOkLoading(true); changeOkLoading(true);
send(input) send(input)

2
aspnet-core/framework/cli/LINGYUN.Abp.Cli/LINGYUN.Abp.Cli.csproj

@ -5,7 +5,7 @@
<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework> <TargetFramework>net8.0</TargetFramework>
<Version>8.2.0</Version> <Version>8.2.1</Version>
<Copyright>colin</Copyright> <Copyright>colin</Copyright>
<Description>Use LINGYUN.MicroService.Templates command line</Description> <Description>Use LINGYUN.MicroService.Templates command line</Description>
<PackAsTool>true</PackAsTool> <PackAsTool>true</PackAsTool>

228
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPConsumerServiceSelector.cs

@ -11,146 +11,144 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// 消费者查找器
/// </summary>
[Dependency(ServiceLifetime.Singleton, ReplaceServices = true)]
[ExposeServices(typeof(IConsumerServiceSelector), typeof(AbpCAPConsumerServiceSelector))]
public class AbpCAPConsumerServiceSelector : ConsumerServiceSelector
{ {
/// <summary> /// <summary>
/// 消费者查找器 /// CAP配置
/// </summary> /// </summary>
[Dependency(ServiceLifetime.Singleton, ReplaceServices = true)] protected CapOptions CapOptions { get; }
[ExposeServices(typeof(IConsumerServiceSelector), typeof(AbpCAPConsumerServiceSelector))] /// <summary>
/// Abp分布式事件配置
/// </summary>
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
/// <summary>
/// 服务提供者
/// </summary>
protected IServiceProvider ServiceProvider { get; }
public class AbpCAPConsumerServiceSelector : ConsumerServiceSelector /// <summary>
/// Creates a new <see cref="T:DotNetCore.CAP.Internal.ConsumerServiceSelector" />.
/// </summary>
public AbpCAPConsumerServiceSelector(
IServiceProvider serviceProvider,
IOptions<CapOptions> capOptions,
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions) : base(serviceProvider)
{
CapOptions = capOptions.Value;
ServiceProvider = serviceProvider;
AbpDistributedEventBusOptions = distributedEventBusOptions.Value;
}
/// <summary>
/// 查找消费者集合
/// </summary>
/// <param name="provider"></param>
/// <returns></returns>
protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{ {
/// <summary> var executorDescriptorList = base.FindConsumersFromInterfaceTypes(provider).ToList();
/// CAP配置 //handlers
/// </summary> var handlers = AbpDistributedEventBusOptions.Handlers;
protected CapOptions CapOptions { get; } var logger = provider.GetRequiredService<ILogger<AbpCAPConsumerServiceSelector>>();
/// <summary> var consumerExecutorDescriptorComparer = new ConsumerExecutorDescriptorComparer(logger);
/// Abp分布式事件配置
/// </summary>
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
/// <summary>
/// 服务提供者
/// </summary>
protected IServiceProvider ServiceProvider { get; }
/// <summary> foreach (var handler in handlers)
/// Creates a new <see cref="T:DotNetCore.CAP.Internal.ConsumerServiceSelector" />.
/// </summary>
public AbpCAPConsumerServiceSelector(
IServiceProvider serviceProvider,
IOptions<CapOptions> capOptions,
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions) : base(serviceProvider)
{
CapOptions = capOptions.Value;
ServiceProvider = serviceProvider;
AbpDistributedEventBusOptions = distributedEventBusOptions.Value;
}
/// <summary>
/// 查找消费者集合
/// </summary>
/// <param name="provider"></param>
/// <returns></returns>
protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{ {
var executorDescriptorList = var interfaces = handler.GetInterfaces();
base.FindConsumersFromInterfaceTypes(provider).ToList(); foreach (var @interface in interfaces)
//handlers
var handlers = AbpDistributedEventBusOptions.Handlers;
var logger = provider.GetRequiredService<ILogger<AbpCAPConsumerServiceSelector>>();
var consumerExecutorDescriptorComparer = new ConsumerExecutorDescriptorComparer(logger);
foreach (var handler in handlers)
{ {
var interfaces = handler.GetInterfaces(); if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
foreach (var @interface in interfaces)
{ {
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface)) continue;
{ }
continue; var genericArgs = @interface.GetGenericArguments();
} if (genericArgs.Length == 1)
var genericArgs = @interface.GetGenericArguments(); {
if (genericArgs.Length == 1) var consumerExecutorDescriptors = GetHandlerDescription(genericArgs[0], handler);
{
var consumerExecutorDescriptors = GetHandlerDescription(genericArgs[0], handler);
foreach (var consumerExecutorDescriptor in consumerExecutorDescriptors) foreach (var consumerExecutorDescriptor in consumerExecutorDescriptors)
{
if (executorDescriptorList.Any(x => consumerExecutorDescriptorComparer.Equals(x, consumerExecutorDescriptor)))
{ {
if (executorDescriptorList.Any(x => consumerExecutorDescriptorComparer.Equals(x, consumerExecutorDescriptor))) // 如果存在多个消费者,后续的消费者需要重新定义分组才能不被 CAP 框架过滤掉
{ var groupAliaName = handler.IsGenericType
// 如果存在多个消费者,后续的消费者需要重新定义分组才能不被 CAP 框架过滤掉 ? handler.GetGenericTypeDefinition().Name
var groupAliaName = handler.IsGenericType : handler.Name;
? handler.GetGenericTypeDefinition().Name
: handler.Name;
// TODO: 2021-05-21 直接使用类型全名作为GroupName会引起用户困惑,加上组别名称在前 // TODO: 2021-05-21 直接使用类型全名作为GroupName会引起用户困惑,加上组别名称在前
consumerExecutorDescriptor.Attribute.Group = $"{CapOptions.DefaultGroupName}.{groupAliaName}"; consumerExecutorDescriptor.Attribute.Group = $"{CapOptions.DefaultGroupName}.{groupAliaName}";
SetSubscribeAttribute(consumerExecutorDescriptor.Attribute); SetSubscribeAttribute(consumerExecutorDescriptor.Attribute);
}
executorDescriptorList.Add(consumerExecutorDescriptor);
} }
//Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler)); executorDescriptorList.Add(consumerExecutorDescriptor);
} }
//Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
} }
} }
return executorDescriptorList;
} }
/// <summary> return executorDescriptorList;
/// 获取事件处理器集合 }
/// </summary> /// <summary>
/// <param name="eventType"></param> /// 获取事件处理器集合
/// <param name="typeInfo"></param> /// </summary>
/// <returns></returns> /// <param name="eventType"></param>
protected virtual IEnumerable<ConsumerExecutorDescriptor> GetHandlerDescription(Type eventType, Type typeInfo) /// <param name="typeInfo"></param>
{ /// <returns></returns>
var serviceTypeInfo = typeof(IDistributedEventHandler<>) protected virtual IEnumerable<ConsumerExecutorDescriptor> GetHandlerDescription(Type eventType, Type typeInfo)
.MakeGenericType(eventType); {
var method = typeInfo var serviceTypeInfo = typeof(IDistributedEventHandler<>)
.GetMethod( .MakeGenericType(eventType);
nameof(IDistributedEventHandler<object>.HandleEventAsync), var method = typeInfo
new[] { eventType } .GetMethod(
); nameof(IDistributedEventHandler<object>.HandleEventAsync),
// TODO: 事件名称定义在事件参数类型,就无法创建多个订阅者类了,增加可选配置,让用户决定事件名称定义在哪里 new[] { eventType }
var eventName = EventNameAttribute.GetNameOrDefault(eventType); );
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true); // TODO: 事件名称定义在事件参数类型,就无法创建多个订阅者类了,增加可选配置,让用户决定事件名称定义在哪里
var topicAttributes = topicAttr.ToList(); var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true);
var topicAttributes = topicAttr.ToList();
topicAttributes.Add(new CapSubscribeAttribute(eventName)); topicAttributes.Add(new CapSubscribeAttribute(eventName));
foreach (var attr in topicAttributes) foreach (var attr in topicAttributes)
{ {
SetSubscribeAttribute(attr); SetSubscribeAttribute(attr);
var parameters = method.GetParameters() var parameters = method.GetParameters()
.Select(parameter => new ParameterDescriptor .Select(parameter => new ParameterDescriptor
{ {
Name = parameter.Name, Name = parameter.Name,
ParameterType = parameter.ParameterType, ParameterType = parameter.ParameterType,
IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any() IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any()
}).ToList(); }).ToList();
yield return InitDescriptor(attr, method, typeInfo.GetTypeInfo(), serviceTypeInfo.GetTypeInfo(), parameters); yield return InitDescriptor(attr, method, typeInfo.GetTypeInfo(), serviceTypeInfo.GetTypeInfo(), parameters);
}
} }
}
private static ConsumerExecutorDescriptor InitDescriptor( private static ConsumerExecutorDescriptor InitDescriptor(
TopicAttribute attr, TopicAttribute attr,
MethodInfo methodInfo, MethodInfo methodInfo,
TypeInfo implType, TypeInfo implType,
TypeInfo serviceTypeInfo, TypeInfo serviceTypeInfo,
IList<ParameterDescriptor> parameters) IList<ParameterDescriptor> parameters)
{
var descriptor = new ConsumerExecutorDescriptor
{ {
var descriptor = new ConsumerExecutorDescriptor Attribute = attr,
{ MethodInfo = methodInfo,
Attribute = attr, ImplTypeInfo = implType,
MethodInfo = methodInfo, ServiceTypeInfo = serviceTypeInfo,
ImplTypeInfo = implType, Parameters = parameters
ServiceTypeInfo = serviceTypeInfo, };
Parameters = parameters
};
return descriptor; return descriptor;
}
} }
} }

63
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusModule.cs

@ -4,49 +4,48 @@ using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// AbpCAPEventBusModule
/// </summary>
[DependsOn(typeof(AbpEventBusModule))]
public class AbpCAPEventBusModule : AbpModule
{ {
/// <summary> /// <summary>
/// AbpCAPEventBusModule /// ConfigureServices
/// </summary> /// </summary>
[DependsOn(typeof(AbpEventBusModule))] /// <param name="context"></param>
public class AbpCAPEventBusModule : AbpModule public override void ConfigureServices(ServiceConfigurationContext context)
{ {
/// <summary> var configuration = context.Services.GetConfiguration();
/// ConfigureServices
/// </summary>
/// <param name="context"></param>
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<AbpCAPEventBusOptions>(configuration.GetSection("CAP:Abp")); Configure<AbpCAPEventBusOptions>(configuration.GetSection("CAP:Abp"));
context.Services.AddTransient<IFailedThresholdCallbackNotifier, FailedThresholdCallbackNotifier>(); context.Services.AddTransient<IFailedThresholdCallbackNotifier, FailedThresholdCallbackNotifier>();
var preActions = context.Services.GetPreConfigureActions<CapOptions>(); var preActions = context.Services.GetPreConfigureActions<CapOptions>();
context.Services.AddCAPEventBus(options => context.Services.AddCAPEventBus(options =>
{ {
// 取消默认的五分钟高频清理 // 取消默认的五分钟高频清理
// options.CollectorCleaningInterval = 360_0000; // options.CollectorCleaningInterval = 360_0000;
configuration.GetSection("CAP:EventBus").Bind(options); configuration.GetSection("CAP:EventBus").Bind(options);
preActions.Configure(options); preActions.Configure(options);
if (options.FailedThresholdCallback == null) if (options.FailedThresholdCallback == null)
{
options.FailedThresholdCallback = async (failed) =>
{ {
options.FailedThresholdCallback = async (failed) => var exceptionNotifier = failed.ServiceProvider.GetService<IFailedThresholdCallbackNotifier>();
if (exceptionNotifier != null)
{ {
var exceptionNotifier = failed.ServiceProvider.GetService<IFailedThresholdCallbackNotifier>(); // TODO: 作为异常处理?
if (exceptionNotifier != null) await exceptionNotifier.NotifyAsync(new AbpCAPExecutionFailedException(failed.MessageType, failed.Message));
{ }
// TODO: 作为异常处理? };
await exceptionNotifier.NotifyAsync(new AbpCAPExecutionFailedException(failed.MessageType, failed.Message)); }
} });
};
}
});
}
} }
} }

19
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPEventBusOptions.cs

@ -1,14 +1,13 @@
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// 过期消息清理配置项
/// </summary>
public class AbpCAPEventBusOptions
{ {
/// <summary> /// <summary>
/// 过期消息清理配置项 /// 发布消息处理失败通知
/// default: false
/// </summary> /// </summary>
public class AbpCAPEventBusOptions public bool NotifyFailedCallback { get; set; } = false;
{
/// <summary>
/// 发布消息处理失败通知
/// default: false
/// </summary>
public bool NotifyFailedCallback { get; set; } = false;
}
} }

87
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPExecutionFailedException.cs

@ -2,55 +2,54 @@
using System; using System;
using Volo.Abp; using Volo.Abp;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// AbpECAPExecutionFailedException
/// </summary>
public class AbpCAPExecutionFailedException : AbpException
{ {
/// <summary> /// <summary>
/// AbpECAPExecutionFailedException /// MessageType
/// </summary>
public MessageType MessageType { get; set; }
/// <summary>
/// Message
/// </summary> /// </summary>
public class AbpCAPExecutionFailedException : AbpException public Message Origin { get; set; }
/// <summary>
/// constructor
/// </summary>
/// <param name="messageType"></param>
/// <param name="origin"></param>
public AbpCAPExecutionFailedException(MessageType messageType, Message origin)
{ {
/// <summary> MessageType = messageType;
/// MessageType Origin = origin;
/// </summary> }
public MessageType MessageType { get; set; }
/// <summary>
/// Message
/// </summary>
public Message Origin { get; set; }
/// <summary>
/// constructor
/// </summary>
/// <param name="messageType"></param>
/// <param name="origin"></param>
public AbpCAPExecutionFailedException(MessageType messageType, Message origin)
{
MessageType = messageType;
Origin = origin;
}
/// <summary> /// <summary>
/// constructor /// constructor
/// </summary> /// </summary>
/// <param name="messageType"></param> /// <param name="messageType"></param>
/// <param name="origin"></param> /// <param name="origin"></param>
/// <param name="message"></param> /// <param name="message"></param>
public AbpCAPExecutionFailedException(MessageType messageType, Message origin, string message) : base(message) public AbpCAPExecutionFailedException(MessageType messageType, Message origin, string message) : base(message)
{ {
MessageType = messageType; MessageType = messageType;
Origin = origin; Origin = origin;
} }
/// <summary> /// <summary>
/// constructor /// constructor
/// </summary> /// </summary>
/// <param name="messageType"></param> /// <param name="messageType"></param>
/// <param name="origin"></param> /// <param name="origin"></param>
/// <param name="message"></param> /// <param name="message"></param>
/// <param name="innerException"></param> /// <param name="innerException"></param>
public AbpCAPExecutionFailedException(MessageType messageType, Message origin, string message, Exception innerException) : base(message, innerException) public AbpCAPExecutionFailedException(MessageType messageType, Message origin, string message, Exception innerException) : base(message, innerException)
{ {
MessageType = messageType; MessageType = messageType;
Origin = origin; Origin = origin;
}
} }
} }

15
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPHeaders.cs

@ -1,13 +1,12 @@
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
public static class AbpCAPHeaders
{ {
public static class AbpCAPHeaders public static string ClientId { get; set; } = "cap-abp-client-id";
{
public static string ClientId { get; set; } = "cap-abp-client-id";
public static string UserId { get; set; } = "cap-abp-user-id"; public static string UserId { get; set; } = "cap-abp-user-id";
public static string TenantId { get; set; } = "cap-abp-tenant-id"; public static string TenantId { get; set; } = "cap-abp-tenant-id";
public static string MessageId { get; set; } = "cap-abp-message-id"; public static string MessageId { get; set; } = "cap-abp-message-id";
}
} }

63
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPMessageExtensions.cs

@ -1,47 +1,46 @@
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using System; using System;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// CAP消息扩展
/// </summary>
public static class AbpCAPMessageExtensions
{ {
/// <summary> /// <summary>
/// CAP消息扩展 /// 尝试获取消息标头中的租户标识
/// </summary> /// </summary>
public static class AbpCAPMessageExtensions /// <param name="message"></param>
/// <param name="tenantId"></param>
/// <returns></returns>
public static bool TryGetTenantId(
this Message message,
out Guid? tenantId)
{ {
/// <summary> if (message.Headers.TryGetValue(AbpCAPHeaders.TenantId, out string tenantStr))
/// 尝试获取消息标头中的租户标识
/// </summary>
/// <param name="message"></param>
/// <param name="tenantId"></param>
/// <returns></returns>
public static bool TryGetTenantId(
this Message message,
out Guid? tenantId)
{ {
if (message.Headers.TryGetValue(AbpCAPHeaders.TenantId, out string tenantStr)) if (Guid.TryParse(tenantStr, out Guid id))
{ {
if (Guid.TryParse(tenantStr, out Guid id)) tenantId = id;
{ return true;
tenantId = id;
return true;
}
} }
tenantId = null;
return false;
} }
/// <summary> tenantId = null;
/// 获取消息标头中的租户标识 return false;
/// </summary> }
/// <param name="message"></param> /// <summary>
/// <returns></returns> /// 获取消息标头中的租户标识
public static Guid? GetTenantIdOrNull( /// </summary>
this Message message) /// <param name="message"></param>
/// <returns></returns>
public static Guid? GetTenantIdOrNull(
this Message message)
{
if (message.TryGetTenantId(out Guid? tenantId))
{ {
if (message.TryGetTenantId(out Guid? tenantId)) return tenantId;
{
return tenantId;
}
return null;
} }
return null;
} }
} }

351
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCAPSubscribeInvoker.cs

@ -8,88 +8,102 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel; using System.ComponentModel;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.MultiTenancy; using Volo.Abp.MultiTenancy;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// 重写 ISubscribeInvoker 实现 Abp 租户集成
/// </summary>
public class AbpCAPSubscribeInvoker : ISubscribeInvoker
{ {
private readonly ICurrentTenant _currentTenant;
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly ISerializer _serializer;
private readonly ConcurrentDictionary<string, ObjectMethodExecutor> _executors;
/// <summary> /// <summary>
/// 重写 ISubscribeInvoker 实现 Abp 租户集成 /// AbpCAPSubscribeInvoker
/// </summary> /// </summary>
public class AbpCAPSubscribeInvoker : ISubscribeInvoker /// <param name="loggerFactory"></param>
/// <param name="serviceProvider"></param>
/// <param name="serializer"></param>
/// <param name="currentTenant"></param>
public AbpCAPSubscribeInvoker(
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
ISerializer serializer,
ICurrentTenant currentTenant)
{ {
private readonly ICurrentTenant _currentTenant; _currentTenant = currentTenant;
_serviceProvider = serviceProvider;
private readonly ILogger _logger; _serializer = serializer;
private readonly IServiceProvider _serviceProvider; _logger = loggerFactory.CreateLogger<SubscribeInvoker>();
private readonly ISerializer _serializer; _executors = new ConcurrentDictionary<string, ObjectMethodExecutor>();
private readonly ConcurrentDictionary<string, ObjectMethodExecutor> _executors; }
/// <summary> /// <summary>
/// AbpCAPSubscribeInvoker /// 调用订阅者方法
/// </summary> /// </summary>
/// <param name="loggerFactory"></param> /// <param name="context"></param>
/// <param name="serviceProvider"></param> /// <param name="cancellationToken"></param>
/// <param name="serializer"></param> /// <returns></returns>
/// <param name="currentTenant"></param> public async virtual Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default)
public AbpCAPSubscribeInvoker( {
ILoggerFactory loggerFactory, cancellationToken.ThrowIfCancellationRequested();
IServiceProvider serviceProvider,
ISerializer serializer,
ICurrentTenant currentTenant)
{
_currentTenant = currentTenant;
_serviceProvider = serviceProvider;
_serializer = serializer;
_logger = loggerFactory.CreateLogger<SubscribeInvoker>();
_executors = new ConcurrentDictionary<string, ObjectMethodExecutor>();
}
/// <summary>
/// 调用订阅者方法
/// </summary>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async virtual Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var methodInfo = context.ConsumerDescriptor.MethodInfo; var methodInfo = context.ConsumerDescriptor.MethodInfo;
var reflectedTypeHandle = methodInfo.ReflectedType!.TypeHandle.Value; var reflectedTypeHandle = methodInfo.ReflectedType!.TypeHandle.Value;
var methodHandle = methodInfo.MethodHandle.Value; var methodHandle = methodInfo.MethodHandle.Value;
var key = $"{reflectedTypeHandle}_{methodHandle}"; var key = $"{reflectedTypeHandle}_{methodHandle}";
_logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name); _logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo)); var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
using var scope = _serviceProvider.CreateScope(); using var scope = _serviceProvider.CreateScope();
var provider = scope.ServiceProvider; var provider = scope.ServiceProvider;
var obj = GetInstance(provider, context); var obj = GetInstance(provider, context);
var message = context.DeliverMessage; var message = context.DeliverMessage;
var parameterDescriptors = context.ConsumerDescriptor.Parameters; var parameterDescriptors = context.ConsumerDescriptor.Parameters;
var executeParameters = new object[parameterDescriptors.Count]; var executeParameters = new object[parameterDescriptors.Count];
// 租户数据可能在消息标头中 // 租户数据可能在消息标头中
var tenantId = message.GetTenantIdOrNull(); var tenantId = message.GetTenantIdOrNull();
for (var i = 0; i < parameterDescriptors.Count; i++) for (var i = 0; i < parameterDescriptors.Count; i++)
{
var parameterDescriptor = parameterDescriptors[i];
if (parameterDescriptor.IsFromCap)
{ {
var parameterDescriptor = parameterDescriptors[i]; executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken);
if (parameterDescriptor.IsFromCap) }
{ else
executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken); {
} if (message.Value != null)
else
{ {
if (message.Value != null) if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json
{
var eventData = _serializer.Deserialize(message.Value, parameterDescriptor.ParameterType);
// 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant)
{
tenantId = tenant.TenantId;
}
executeParameters[i] = eventData;
}
else
{ {
if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(message.Value.GetType()))
{ {
var eventData = _serializer.Deserialize(message.Value, parameterDescriptor.ParameterType); var eventData = converter.ConvertFrom(message.Value);
// 租户数据也可能存在事件数据中 // 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant) if (tenantId == null && eventData is IMultiTenant tenant)
{ {
@ -99,155 +113,152 @@ namespace LINGYUN.Abp.EventBus.CAP
} }
else else
{ {
var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); if (parameterDescriptor.ParameterType.IsInstanceOfType(message.Value))
if (converter.CanConvertFrom(message.Value.GetType()))
{ {
var eventData = converter.ConvertFrom(message.Value);
// 租户数据也可能存在事件数据中 // 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant) if (tenantId == null && message.Value is IMultiTenant tenant)
{ {
tenantId = tenant.TenantId; tenantId = tenant.TenantId;
} }
executeParameters[i] = eventData; executeParameters[i] = message.Value;
} }
else else
{ {
if (parameterDescriptor.ParameterType.IsInstanceOfType(message.Value)) var eventData = Convert.ChangeType(message.Value, parameterDescriptor.ParameterType);
{ // 租户数据也可能存在事件数据中
// 租户数据也可能存在事件数据中 if (tenantId == null && eventData is IMultiTenant tenant)
if (tenantId == null && message.Value is IMultiTenant tenant)
{
tenantId = tenant.TenantId;
}
executeParameters[i] = message.Value;
}
else
{ {
var eventData = Convert.ChangeType(message.Value, parameterDescriptor.ParameterType); tenantId = tenant.TenantId;
// 租户数据也可能存在事件数据中
if (tenantId == null && eventData is IMultiTenant tenant)
{
tenantId = tenant.TenantId;
}
executeParameters[i] = eventData;
} }
executeParameters[i] = eventData;
} }
} }
} }
} }
} }
}
// 改变租户 // 改变租户
using (_currentTenant.Change(tenantId)) using (_currentTenant.Change(tenantId))
{ {
var filter = provider.GetService<ISubscribeFilter>(); var filter = provider.GetService<ISubscribeFilter>();
object resultObj = null; object resultObj = null;
try try
{
if (filter != null)
{ {
if (filter != null) var etContext = new ExecutingContext(context, executeParameters);
{ await filter.OnSubscribeExecutingAsync(etContext);
var etContext = new ExecutingContext(context, executeParameters); executeParameters = etContext.Arguments;
await filter.OnSubscribeExecutingAsync(etContext); }
executeParameters = etContext.Arguments;
}
resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
if (filter != null) if (filter != null)
{ {
var edContext = new ExecutedContext(context, resultObj); var edContext = new ExecutedContext(context, resultObj);
await filter.OnSubscribeExecutedAsync(edContext); await filter.OnSubscribeExecutedAsync(edContext);
resultObj = edContext.Result; resultObj = edContext.Result;
}
} }
catch (Exception e) }
catch (Exception e)
{
if (filter != null)
{ {
if (filter != null) var exContext = new ExceptionContext(context, e);
await filter.OnSubscribeExceptionAsync(exContext);
if (!exContext.ExceptionHandled)
{ {
var exContext = new ExceptionContext(context, e); throw exContext.Exception;
await filter.OnSubscribeExceptionAsync(exContext);
if (!exContext.ExceptionHandled)
{
throw exContext.Exception;
}
if (exContext.Result != null)
{
resultObj = exContext.Result;
}
} }
else
if (exContext.Result != null)
{ {
throw; resultObj = exContext.Result;
} }
} }
else
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); {
throw;
}
} }
}
/// <summary> var callbackName = message.GetCallbackName();
/// if (string.IsNullOrEmpty(callbackName))
/// </summary>
/// <param name="parameterDescriptor"></param>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,
CancellationToken cancellationToken)
{
if (typeof(CancellationToken).IsAssignableFrom(parameterDescriptor.ParameterType))
{ {
return cancellationToken; return new ConsumerExecutedResult(resultObj, message.GetId(), null, null);
} }
else
if (parameterDescriptor.ParameterType.IsAssignableFrom(typeof(CapHeader)))
{ {
return new CapHeader(message.Headers); var capHeader = executeParameters.FirstOrDefault(x => x is CapHeader) as CapHeader;
IDictionary<string, string> callbackHeader = null;
// TODO: CapHeader.ResponseHeader
return new ConsumerExecutedResult(resultObj, message.GetId(), callbackName, callbackHeader);
} }
throw new ArgumentException(parameterDescriptor.Name);
} }
/// <summary> }
/// 获取事件处理类实例 /// <summary>
/// </summary> ///
/// <param name="provider"></param> /// </summary>
/// <param name="context"></param> /// <param name="parameterDescriptor"></param>
/// <returns></returns> /// <param name="message"></param>
protected virtual object GetInstance(IServiceProvider provider, ConsumerContext context) /// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,
CancellationToken cancellationToken)
{
if (typeof(CancellationToken).IsAssignableFrom(parameterDescriptor.ParameterType))
{ {
var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType(); return cancellationToken;
var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType(); }
object obj = null; if (parameterDescriptor.ParameterType.IsAssignableFrom(typeof(CapHeader)))
if (srvType != null) {
{ return new CapHeader(message.Headers);
obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType); }
}
if (obj == null) throw new ArgumentException(parameterDescriptor.Name);
{ }
obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType); /// <summary>
} /// 获取事件处理类实例
/// </summary>
/// <param name="provider"></param>
/// <param name="context"></param>
/// <returns></returns>
protected virtual object GetInstance(IServiceProvider provider, ConsumerContext context)
{
var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType();
var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType();
return obj; object obj = null;
if (srvType != null)
{
obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType);
} }
/// <summary>
/// 通过给定的类型实例与参数调用订阅者方法 if (obj == null)
/// </summary>
/// <param name="executor"></param>
/// <param name="class"></param>
/// <param name="parameter"></param>
/// <returns></returns>
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter)
{ {
if (executor.IsMethodAsync) obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
{ }
return await executor.ExecuteAsync(@class, parameter);
}
return executor.Execute(@class, parameter); return obj;
}
/// <summary>
/// 通过给定的类型实例与参数调用订阅者方法
/// </summary>
/// <param name="executor"></param>
/// <param name="class"></param>
/// <param name="parameter"></param>
/// <returns></returns>
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter)
{
if (executor.IsMethodAsync)
{
return await executor.ExecuteAsync(@class, parameter);
} }
return executor.Execute(@class, parameter);
} }
} }

111
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/AbpCapSerializer.cs

@ -9,81 +9,80 @@ using System.Threading.Tasks;
using Volo.Abp.Json; using Volo.Abp.Json;
using Volo.Abp.Json.SystemTextJson; using Volo.Abp.Json.SystemTextJson;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
public class AbpCapSerializer : ISerializer
{ {
public class AbpCapSerializer : ISerializer private readonly IJsonSerializer _jsonSerializer;
private readonly AbpSystemTextJsonSerializerOptions _jsonSerializerOptions;
public AbpCapSerializer(
IJsonSerializer jsonSerializer,
IOptions<AbpSystemTextJsonSerializerOptions> options)
{ {
private readonly IJsonSerializer _jsonSerializer; _jsonSerializer = jsonSerializer;
private readonly AbpSystemTextJsonSerializerOptions _jsonSerializerOptions; _jsonSerializerOptions = options.Value;
}
public AbpCapSerializer( public ValueTask<TransportMessage> SerializeAsync(Message message)
IJsonSerializer jsonSerializer, {
IOptions<AbpSystemTextJsonSerializerOptions> options) if (message == null)
{ {
_jsonSerializer = jsonSerializer; throw new ArgumentNullException(nameof(message));
_jsonSerializerOptions = options.Value;
} }
public ValueTask<TransportMessage> SerializeAsync(Message message) if (message.Value == null)
{ {
if (message == null) return ValueTask.FromResult(new TransportMessage(message.Headers, null));
{
throw new ArgumentNullException(nameof(message));
}
if (message.Value == null)
{
return ValueTask.FromResult(new TransportMessage(message.Headers, null));
}
var messageStr = _jsonSerializer.Serialize(message.Value);
var jsonBytes = Encoding.UTF8.GetBytes(messageStr);
// var jsonBytes = JsonSerializer.SerializeToUtf8Bytes(message.Value, _jsonSerializerOptions.JsonSerializerOptions);
return ValueTask.FromResult(new TransportMessage(message.Headers, jsonBytes));
} }
var messageStr = _jsonSerializer.Serialize(message.Value);
var jsonBytes = Encoding.UTF8.GetBytes(messageStr);
// var jsonBytes = JsonSerializer.SerializeToUtf8Bytes(message.Value, _jsonSerializerOptions.JsonSerializerOptions);
return ValueTask.FromResult(new TransportMessage(message.Headers, jsonBytes));
}
public ValueTask<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType) public ValueTask<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType)
{
if (valueType == null || ReadOnlyMemory<byte>.Empty.Equals(transportMessage.Body) || transportMessage.Body.Length == 0)
{ {
if (valueType == null || ReadOnlyMemory<byte>.Empty.Equals(transportMessage.Body) || transportMessage.Body.Length == 0) return ValueTask.FromResult(new Message(transportMessage.Headers, null));
{ }
return ValueTask.FromResult(new Message(transportMessage.Headers, null));
}
var messageBytes = Encoding.UTF8.GetString(transportMessage.Body.Span); var messageBytes = Encoding.UTF8.GetString(transportMessage.Body.Span);
var obj = _jsonSerializer.Deserialize(valueType, messageBytes); var obj = _jsonSerializer.Deserialize(valueType, messageBytes);
// var obj = JsonSerializer.Deserialize(transportMessage.Body, valueType, _jsonSerializerOptions.JsonSerializerOptions); // var obj = JsonSerializer.Deserialize(transportMessage.Body, valueType, _jsonSerializerOptions.JsonSerializerOptions);
return ValueTask.FromResult(new Message(transportMessage.Headers, obj)); return ValueTask.FromResult(new Message(transportMessage.Headers, obj));
} }
public string Serialize(Message message) public string Serialize(Message message)
{ {
return _jsonSerializer.Serialize(message); return _jsonSerializer.Serialize(message);
// return JsonSerializer.Serialize(message, _jsonSerializerOptions.JsonSerializerOptions); // return JsonSerializer.Serialize(message, _jsonSerializerOptions.JsonSerializerOptions);
} }
public Message Deserialize(string json) public Message Deserialize(string json)
{ {
return _jsonSerializer.Deserialize<Message>(json); return _jsonSerializer.Deserialize<Message>(json);
// return JsonSerializer.Deserialize<Message>(json, _jsonSerializerOptions.JsonSerializerOptions); // return JsonSerializer.Deserialize<Message>(json, _jsonSerializerOptions.JsonSerializerOptions);
} }
public object Deserialize(object value, Type valueType) public object Deserialize(object value, Type valueType)
{
if (value is JsonElement jToken)
{ {
if (value is JsonElement jToken) var bufferWriter = new ArrayBufferWriter<byte>();
using (var writer = new Utf8JsonWriter(bufferWriter))
{ {
var bufferWriter = new ArrayBufferWriter<byte>(); jToken.WriteTo(writer);
using (var writer = new Utf8JsonWriter(bufferWriter))
{
jToken.WriteTo(writer);
}
return JsonSerializer.Deserialize(bufferWriter.WrittenSpan, valueType, _jsonSerializerOptions.JsonSerializerOptions);
} }
throw new NotSupportedException("Type is not of type JToken"); return JsonSerializer.Deserialize(bufferWriter.WrittenSpan, valueType, _jsonSerializerOptions.JsonSerializerOptions);
} }
throw new NotSupportedException("Type is not of type JToken");
}
public bool IsJsonType(object jsonObject) public bool IsJsonType(object jsonObject)
{ {
return jsonObject is JsonElement; return jsonObject is JsonElement;
}
} }
} }

431
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CAPDistributedEventBus.cs

@ -22,256 +22,255 @@ using Volo.Abp.Tracing;
using Volo.Abp.Uow; using Volo.Abp.Uow;
using Volo.Abp.Users; using Volo.Abp.Users;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// CAP分布式事件总线
/// </summary>
[Dependency(ServiceLifetime.Singleton, ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(CAPDistributedEventBus))]
public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEventBus
{ {
/// <summary> /// <summary>
/// CAP分布式事件总线 /// CAP消息发布接口
/// </summary>
protected readonly ICapPublisher CapPublisher;
/// <summary>
/// 自定义事件注册接口
/// </summary>
protected ICustomDistributedEventSubscriber CustomDistributedEventSubscriber { get; }
/// <summary>
/// 本地事件处理器工厂对象集合
/// </summary>
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
/// <summary>
/// 本地事件集合
/// </summary>
protected ConcurrentDictionary<string, Type> EventTypes { get; }
/// <summary>
/// 当前用户
/// </summary>
protected ICurrentUser CurrentUser { get; }
/// <summary>
/// 当前客户端
/// </summary> /// </summary>
[Dependency(ServiceLifetime.Singleton, ReplaceServices = true)] protected ICurrentClient CurrentClient { get; }
[ExposeServices(typeof(IDistributedEventBus), typeof(CAPDistributedEventBus))] /// <summary>
public class CAPDistributedEventBus : DistributedEventBusBase, IDistributedEventBus /// typeof <see cref="IJsonSerializer"/>
/// </summary>
protected IJsonSerializer JsonSerializer { get; }
/// <summary>
/// 取消令牌
/// </summary>
protected ICancellationTokenProvider CancellationTokenProvider { get; }
/// <summary>
/// constructor
/// </summary>
/// <param name="serviceScopeFactory"></param>
/// <param name="distributedEventBusOptions"></param>
/// <param name="capPublisher"></param>
/// <param name="currentUser"></param>
/// <param name="currentClient"></param>
/// <param name="currentTenant"></param>
/// <param name="jsonSerializer"></param>
/// <param name="unitOfWorkManager"></param>
/// <param name="cancellationTokenProvider"></param>
/// <param name="guidGenerator"></param>
/// <param name="clock"></param>
/// <param name="customDistributedEventSubscriber"></param>
/// <param name="eventHandlerInvoker"></param>
/// <param name="localEventBus"></param>
/// <param name="correlationIdProvider"></param>
public CAPDistributedEventBus(IServiceScopeFactory serviceScopeFactory,
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
ICapPublisher capPublisher,
ICurrentUser currentUser,
ICurrentClient currentClient,
ICurrentTenant currentTenant,
IJsonSerializer jsonSerializer,
IUnitOfWorkManager unitOfWorkManager,
IGuidGenerator guidGenerator,
IClock clock,
ICancellationTokenProvider cancellationTokenProvider,
ICustomDistributedEventSubscriber customDistributedEventSubscriber,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider)
: base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
distributedEventBusOptions,
guidGenerator,
clock,
eventHandlerInvoker,
localEventBus,
correlationIdProvider)
{ {
/// <summary> CapPublisher = capPublisher;
/// CAP消息发布接口 CurrentUser = currentUser;
/// </summary> CurrentClient = currentClient;
protected readonly ICapPublisher CapPublisher; JsonSerializer = jsonSerializer;
/// <summary> CancellationTokenProvider = cancellationTokenProvider;
/// 自定义事件注册接口 CustomDistributedEventSubscriber = customDistributedEventSubscriber;
/// </summary> HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
protected ICustomDistributedEventSubscriber CustomDistributedEventSubscriber { get; } EventTypes = new ConcurrentDictionary<string, Type>();
/// <summary> }
/// 本地事件处理器工厂对象集合 /// <summary>
/// </summary> /// 订阅事件
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } /// </summary>
/// <summary> /// <param name="eventType"></param>
/// 本地事件集合 /// <param name="factory"></param>
/// </summary> /// <returns></returns>
protected ConcurrentDictionary<string, Type> EventTypes { get; } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
/// <summary> {
/// 当前用户 return NullDisposable.Instance;
/// </summary> }
protected ICurrentUser CurrentUser { get; } /// <summary>
/// <summary> /// 退订事件
/// 当前客户端 /// </summary>
/// </summary> /// <typeparam name="TEvent">事件类型</typeparam>
protected ICurrentClient CurrentClient { get; } /// <param name="action"></param>
/// <summary> public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
/// typeof <see cref="IJsonSerializer"/> {
/// </summary> }
protected IJsonSerializer JsonSerializer { get; } /// <summary>
/// <summary> /// 退订事件
/// 取消令牌 /// </summary>
/// </summary> /// <param name="eventType">事件类型</param>
protected ICancellationTokenProvider CancellationTokenProvider { get; } /// <param name="handler">事件处理器</param>
/// <summary> public override void Unsubscribe(Type eventType, IEventHandler handler)
/// constructor {
/// </summary> }
/// <param name="serviceScopeFactory"></param> /// <summary>
/// <param name="distributedEventBusOptions"></param> /// 退订事件
/// <param name="capPublisher"></param> /// </summary>
/// <param name="currentUser"></param> /// <param name="eventType">事件类型</param>
/// <param name="currentClient"></param> /// <param name="factory">事件处理器工厂</param>
/// <param name="currentTenant"></param> public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
/// <param name="jsonSerializer"></param> {
/// <param name="unitOfWorkManager"></param> }
/// <param name="cancellationTokenProvider"></param> /// <summary>
/// <param name="guidGenerator"></param> /// 退订所有事件
/// <param name="clock"></param> /// </summary>
/// <param name="customDistributedEventSubscriber"></param> /// <param name="eventType">事件类型</param>
/// <param name="eventHandlerInvoker"></param> public override void UnsubscribeAll(Type eventType)
/// <param name="localEventBus"></param> {
/// <param name="correlationIdProvider"></param> }
public CAPDistributedEventBus(IServiceScopeFactory serviceScopeFactory, /// <summary>
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions, /// 发布事件
ICapPublisher capPublisher, /// </summary>
ICurrentUser currentUser, /// <param name="eventType">事件类型</param>
ICurrentClient currentClient, /// <param name="eventData">事件数据对象</param>
ICurrentTenant currentTenant, /// <returns></returns>
IJsonSerializer jsonSerializer, protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
IUnitOfWorkManager unitOfWorkManager, {
IGuidGenerator guidGenerator, var eventName = EventNameAttribute.GetNameOrDefault(eventType);
IClock clock, await PublishAsync(eventName, eventData);
ICancellationTokenProvider cancellationTokenProvider, }
ICustomDistributedEventSubscriber customDistributedEventSubscriber,
IEventHandlerInvoker eventHandlerInvoker, /// <summary>
ILocalEventBus localEventBus, /// 获取事件处理器工厂列表
ICorrelationIdProvider correlationIdProvider) /// </summary>
: base( /// <param name="eventType"></param>
serviceScopeFactory, /// <returns></returns>
currentTenant, protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
unitOfWorkManager, {
distributedEventBusOptions, var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
guidGenerator,
clock, foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
eventHandlerInvoker,
localEventBus,
correlationIdProvider)
{
CapPublisher = capPublisher;
CurrentUser = currentUser;
CurrentClient = currentClient;
JsonSerializer = jsonSerializer;
CancellationTokenProvider = cancellationTokenProvider;
CustomDistributedEventSubscriber = customDistributedEventSubscriber;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}
/// <summary>
/// 订阅事件
/// </summary>
/// <param name="eventType"></param>
/// <param name="factory"></param>
/// <returns></returns>
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
return NullDisposable.Instance;
}
/// <summary>
/// 退订事件
/// </summary>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <param name="action"></param>
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
}
/// <summary>
/// 退订事件
/// </summary>
/// <param name="eventType">事件类型</param>
/// <param name="handler">事件处理器</param>
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
}
/// <summary>
/// 退订事件
/// </summary>
/// <param name="eventType">事件类型</param>
/// <param name="factory">事件处理器工厂</param>
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
}
/// <summary>
/// 退订所有事件
/// </summary>
/// <param name="eventType">事件类型</param>
public override void UnsubscribeAll(Type eventType)
{ {
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
} }
/// <summary>
/// 发布事件 return handlerFactoryList.ToArray();
/// </summary> }
/// <param name="eventType">事件类型</param>
/// <param name="eventData">事件数据对象</param> private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
/// <returns></returns> {
protected override async Task PublishToEventBusAsync(Type eventType, object eventData) //Should trigger same type
if (handlerEventType == targetEventType)
{ {
var eventName = EventNameAttribute.GetNameOrDefault(eventType); return true;
await PublishAsync(eventName, eventData);
} }
/// <summary> //TODO: Support inheritance? But it does not support on subscription to RabbitMq!
/// 获取事件处理器工厂列表 //Should trigger for inherited types
/// </summary> if (handlerEventType.IsAssignableFrom(targetEventType))
/// <param name="eventType"></param>
/// <returns></returns>
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{ {
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>(); return true;
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
} }
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) return false;
{ }
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
//TODO: Support inheritance? But it does not support on subscription to RabbitMq! public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
//Should trigger for inherited types {
if (handlerEventType.IsAssignableFrom(targetEventType)) await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData);
{ }
return true;
}
return false; public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
{
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName);
if (eventType == null)
{
return;
} }
public override async Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig) var eventJson = Encoding.UTF8.GetString(incomingEvent.EventData);
var eventData = JsonSerializer.Deserialize(eventType, eventJson);
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
if (exceptions.Any())
{ {
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData); ThrowOriginalExceptions(eventType, exceptions);
} }
}
public override async Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig) protected override byte[] Serialize(object eventData)
{ {
var eventType = EventTypes.GetOrDefault(incomingEvent.EventName); var eventJson = JsonSerializer.Serialize(eventData);
if (eventType == null)
{
return;
}
var eventJson = Encoding.UTF8.GetString(incomingEvent.EventData); return Encoding.UTF8.GetBytes(eventJson);
var eventData = JsonSerializer.Deserialize(eventType, eventJson); }
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}
protected override byte[] Serialize(object eventData) protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord)
{ {
var eventJson = JsonSerializer.Serialize(eventData); unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}
return Encoding.UTF8.GetBytes(eventJson); protected async Task PublishAsync(string eventName, object eventData)
} {
await CapPublisher
.PublishAsync(
eventName, eventData,
new Dictionary<string, string>
{
{ AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" },
{ AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" },
{ AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" },
},
CancellationTokenProvider.FallbackToProvider());
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventRecord eventRecord) public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{ {
unitOfWork.AddOrReplaceDistributedEvent(eventRecord); var outgoingEventArray = outgoingEvents.ToArray();
}
protected async Task PublishAsync(string eventName, object eventData) foreach (var outgoingEvent in outgoingEventArray)
{ {
await CapPublisher await CapPublisher
.PublishAsync( .PublishAsync(
eventName, eventData, outgoingEvent.EventName,
outgoingEvent.EventData,
new Dictionary<string, string> new Dictionary<string, string>
{ {
{ AbpCAPHeaders.MessageId, outgoingEvent.Id.ToString() },
{ AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" }, { AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" },
{ AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" }, { AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" },
{ AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" }, { AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" },
}, },
CancellationTokenProvider.FallbackToProvider()); CancellationTokenProvider.FallbackToProvider());
} }
public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
{
var outgoingEventArray = outgoingEvents.ToArray();
foreach (var outgoingEvent in outgoingEventArray)
{
await CapPublisher
.PublishAsync(
outgoingEvent.EventName,
outgoingEvent.EventData,
new Dictionary<string, string>
{
{ AbpCAPHeaders.MessageId, outgoingEvent.Id.ToString() },
{ AbpCAPHeaders.UserId, CurrentUser.Id?.ToString() ?? "" },
{ AbpCAPHeaders.ClientId, CurrentClient.Id ?? "" },
{ AbpCAPHeaders.TenantId, CurrentTenant.Id?.ToString() ?? "" },
},
CancellationTokenProvider.FallbackToProvider());
}
}
} }
} }

199
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/CustomDistributedEventSubscriber.cs

@ -13,125 +13,124 @@ using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading; using Volo.Abp.Threading;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
{
internal class CustomDistributedEventSubscriber : ICustomDistributedEventSubscriber, ISingletonDependency
{
protected CapOptions CapOptions { get; }
protected IConsumerClientFactory ConsumerClientFactory { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } internal class CustomDistributedEventSubscriber : ICustomDistributedEventSubscriber, ISingletonDependency
protected ConcurrentDictionary<string, CancellationTokenSource> EventStopingTokens { get; } {
public CustomDistributedEventSubscriber( protected CapOptions CapOptions { get; }
IOptions<CapOptions> capOptions, protected IConsumerClientFactory ConsumerClientFactory { get; }
IConsumerClientFactory consumerClientFactory)
{
CapOptions = capOptions.Value;
ConsumerClientFactory = consumerClientFactory;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
EventStopingTokens = new ConcurrentDictionary<string, CancellationTokenSource>(); protected ConcurrentDictionary<string, CancellationTokenSource> EventStopingTokens { get; }
} public CustomDistributedEventSubscriber(
IOptions<CapOptions> capOptions,
IConsumerClientFactory consumerClientFactory)
{
CapOptions = capOptions.Value;
ConsumerClientFactory = consumerClientFactory;
public void Subscribe(Type eventType, IEventHandlerFactory factory) HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
{ EventStopingTokens = new ConcurrentDictionary<string, CancellationTokenSource>();
GetOrCreateHandlerFactories(eventType) }
.Locking(factories =>
{
if (!factories.Contains(factory))
{
factories.Add(factory);
// TODO 客户端订阅
}
});
}
public void UnSubscribe(Type eventType, IEventHandlerFactory factory) public void Subscribe(Type eventType, IEventHandlerFactory factory)
{ {
GetOrCreateHandlerFactories(eventType) GetOrCreateHandlerFactories(eventType)
.Locking(factories => .Locking(factories =>
{
if (!factories.Contains(factory))
{ {
if (factories.Contains(factory)) factories.Add(factory);
{ // TODO 客户端订阅
factories.Remove(factory); }
// TODO 客户端退订 });
} }
});
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) public void UnSubscribe(Type eventType, IEventHandlerFactory factory)
{ {
return HandlerFactories.GetOrAdd( GetOrCreateHandlerFactories(eventType)
eventType, .Locking(factories =>
type => {
if (factories.Contains(factory))
{ {
var eventName = EventNameAttribute.GetNameOrDefault(type); factories.Remove(factory);
EventStopingTokens[eventName] = new CancellationTokenSource(); // TODO 客户端退订
return new List<IEventHandlerFactory>();
} }
); });
} }
private IEnumerable<ConsumerExecutorDescriptor> GetHandlerDescription(Type eventType, Type typeInfo) private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{ {
var serviceTypeInfo = typeof(IDistributedEventHandler<>) return HandlerFactories.GetOrAdd(
.MakeGenericType(eventType); eventType,
var method = typeInfo type =>
.GetMethod( {
nameof(IDistributedEventHandler<object>.HandleEventAsync), var eventName = EventNameAttribute.GetNameOrDefault(type);
new[] { eventType } EventStopingTokens[eventName] = new CancellationTokenSource();
); return new List<IEventHandlerFactory>();
var eventName = EventNameAttribute.GetNameOrDefault(eventType); }
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true); );
var topicAttributes = topicAttr.ToList(); }
private IEnumerable<ConsumerExecutorDescriptor> GetHandlerDescription(Type eventType, Type typeInfo)
{
var serviceTypeInfo = typeof(IDistributedEventHandler<>)
.MakeGenericType(eventType);
var method = typeInfo
.GetMethod(
nameof(IDistributedEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true);
var topicAttributes = topicAttr.ToList();
topicAttributes.Add(new CapSubscribeAttribute(eventName)); topicAttributes.Add(new CapSubscribeAttribute(eventName));
foreach (var attr in topicAttributes) foreach (var attr in topicAttributes)
{ {
SetSubscribeAttribute(attr); SetSubscribeAttribute(attr);
var parameters = method.GetParameters() var parameters = method.GetParameters()
.Select(parameter => new ParameterDescriptor .Select(parameter => new ParameterDescriptor
{ {
Name = parameter.Name, Name = parameter.Name,
ParameterType = parameter.ParameterType, ParameterType = parameter.ParameterType,
IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any() IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any()
}).ToList(); }).ToList();
yield return InitDescriptor(attr, method, typeInfo.GetTypeInfo(), serviceTypeInfo.GetTypeInfo(), parameters); yield return InitDescriptor(attr, method, typeInfo.GetTypeInfo(), serviceTypeInfo.GetTypeInfo(), parameters);
}
} }
}
private void SetSubscribeAttribute(TopicAttribute attribute) private void SetSubscribeAttribute(TopicAttribute attribute)
{
if (attribute.Group == null)
{ {
if (attribute.Group == null) attribute.Group = CapOptions.DefaultGroupName + "." + CapOptions.Version;
{
attribute.Group = CapOptions.DefaultGroupName + "." + CapOptions.Version;
}
else
{
attribute.Group = attribute.Group + "." + CapOptions.Version;
}
} }
else
{
attribute.Group = attribute.Group + "." + CapOptions.Version;
}
}
private ConsumerExecutorDescriptor InitDescriptor( private ConsumerExecutorDescriptor InitDescriptor(
TopicAttribute attr, TopicAttribute attr,
MethodInfo methodInfo, MethodInfo methodInfo,
TypeInfo implType, TypeInfo implType,
TypeInfo serviceTypeInfo, TypeInfo serviceTypeInfo,
IList<ParameterDescriptor> parameters) IList<ParameterDescriptor> parameters)
{
var descriptor = new ConsumerExecutorDescriptor
{ {
var descriptor = new ConsumerExecutorDescriptor Attribute = attr,
{ MethodInfo = methodInfo,
Attribute = attr, ImplTypeInfo = implType,
MethodInfo = methodInfo, ServiceTypeInfo = serviceTypeInfo,
ImplTypeInfo = implType, Parameters = parameters
ServiceTypeInfo = serviceTypeInfo, };
Parameters = parameters
};
return descriptor; return descriptor;
}
} }
} }

37
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/FailedThresholdCallbackNotifier.cs

@ -3,29 +3,28 @@ using System.Threading.Tasks;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.ExceptionHandling; using Volo.Abp.ExceptionHandling;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
[DisableConventionalRegistration]
public class FailedThresholdCallbackNotifier : IFailedThresholdCallbackNotifier
{ {
[DisableConventionalRegistration] protected AbpCAPEventBusOptions Options { get; }
public class FailedThresholdCallbackNotifier : IFailedThresholdCallbackNotifier protected IExceptionNotifier ExceptionNotifier { get; }
{
protected AbpCAPEventBusOptions Options { get; }
protected IExceptionNotifier ExceptionNotifier { get; }
public FailedThresholdCallbackNotifier( public FailedThresholdCallbackNotifier(
IOptions<AbpCAPEventBusOptions> options, IOptions<AbpCAPEventBusOptions> options,
IExceptionNotifier exceptionNotifier) IExceptionNotifier exceptionNotifier)
{ {
Options = options.Value; Options = options.Value;
ExceptionNotifier = exceptionNotifier; ExceptionNotifier = exceptionNotifier;
} }
public async virtual Task NotifyAsync(AbpCAPExecutionFailedException exception) public async virtual Task NotifyAsync(AbpCAPExecutionFailedException exception)
{
// 通过额外的选项来控制是否发送消息处理失败的事件
if (Options.NotifyFailedCallback)
{ {
// 通过额外的选项来控制是否发送消息处理失败的事件 await ExceptionNotifier.NotifyAsync(exception);
if (Options.NotifyFailedCallback)
{
await ExceptionNotifier.NotifyAsync(exception);
}
} }
} }
} }

33
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/ICustomDistributedEventSubscriber.cs

@ -1,24 +1,23 @@
using System; using System;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
/// <summary>
/// 自定义事件订阅者
/// </summary>
public interface ICustomDistributedEventSubscriber
{ {
/// <summary> /// <summary>
/// 自定义事件订阅者 /// 订阅事件
/// </summary>
/// <param name="eventType"></param>
/// <param name="factory"></param>
void Subscribe(Type eventType, IEventHandlerFactory factory);
/// <summary>
/// 取消订阅
/// </summary> /// </summary>
public interface ICustomDistributedEventSubscriber /// <param name="eventType"></param>
{ /// <param name="factory"></param>
/// <summary> void UnSubscribe(Type eventType, IEventHandlerFactory factory);
/// 订阅事件
/// </summary>
/// <param name="eventType"></param>
/// <param name="factory"></param>
void Subscribe(Type eventType, IEventHandlerFactory factory);
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="eventType"></param>
/// <param name="factory"></param>
void UnSubscribe(Type eventType, IEventHandlerFactory factory);
}
} }

9
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/IFailedThresholdCallbackNotifier.cs

@ -1,9 +1,8 @@
using System.Threading.Tasks; using System.Threading.Tasks;
namespace LINGYUN.Abp.EventBus.CAP namespace LINGYUN.Abp.EventBus.CAP;
public interface IFailedThresholdCallbackNotifier
{ {
public interface IFailedThresholdCallbackNotifier Task NotifyAsync(AbpCAPExecutionFailedException exception);
{
Task NotifyAsync(AbpCAPExecutionFailedException exception);
}
} }

194
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/AwaitableInfo.cs

@ -6,123 +6,111 @@ using System.Linq;
using System.Reflection; using System.Reflection;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
namespace LINGYUN.Abp.EventBus.CAP.Internal namespace LINGYUN.Abp.EventBus.CAP.Internal;
internal readonly struct AwaitableInfo
{ {
internal struct AwaitableInfo public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
public MethodInfo AwaiterGetResultMethod { get; }
public MethodInfo AwaiterOnCompletedMethod { get; }
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; }
public Type ResultType { get; }
public MethodInfo GetAwaiterMethod { get; }
public AwaitableInfo(
Type awaiterType,
PropertyInfo awaiterIsCompletedProperty,
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{ {
public Type AwaiterType { get; } AwaiterType = awaiterType;
public PropertyInfo AwaiterIsCompletedProperty { get; } AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
public MethodInfo AwaiterGetResultMethod { get; } AwaiterGetResultMethod = awaiterGetResultMethod;
public MethodInfo AwaiterOnCompletedMethod { get; } AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; } AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
public Type ResultType { get; } ResultType = resultType;
public MethodInfo GetAwaiterMethod { get; } GetAwaiterMethod = getAwaiterMethod;
}
public AwaitableInfo( public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo)
Type awaiterType, {
PropertyInfo awaiterIsCompletedProperty, // Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{
AwaiterType = awaiterType;
AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
AwaiterGetResultMethod = awaiterGetResultMethod;
AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
ResultType = resultType;
GetAwaiterMethod = getAwaiterMethod;
}
public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo) // Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
if (getAwaiterMethod == null)
{ {
// Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347 awaitableInfo = default;
return false;
}
// Awaitable must have method matching "object GetAwaiter()" var awaiterType = getAwaiterMethod.ReturnType;
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
if (getAwaiterMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
var awaiterType = getAwaiterMethod.ReturnType; // Awaiter must have property matching "bool IsCompleted { get; }"
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p =>
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase)
&& p.PropertyType == typeof(bool)
&& p.GetMethod != null);
if (isCompletedProperty == null)
{
awaitableInfo = default;
return false;
}
// Awaiter must have property matching "bool IsCompleted { get; }" // Awaiter must implement INotifyCompletion
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p => var awaiterInterfaces = awaiterType.GetInterfaces();
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase) var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion));
&& p.PropertyType == typeof(bool) if (!implementsINotifyCompletion)
&& p.GetMethod != null); {
if (isCompletedProperty == null) awaitableInfo = default;
{ return false;
awaitableInfo = default(AwaitableInfo); }
return false;
}
// Awaiter must implement INotifyCompletion // INotifyCompletion supplies a method matching "void OnCompleted(Action action)"
var awaiterInterfaces = awaiterType.GetInterfaces(); var onCompletedMethod = typeof(INotifyCompletion).GetRuntimeMethods().Single(m =>
var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion)); m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase)
if (!implementsINotifyCompletion) && m.ReturnType == typeof(void)
{ && m.GetParameters().Length == 1
awaitableInfo = default(AwaitableInfo); && m.GetParameters()[0].ParameterType == typeof(Action));
return false;
}
// INotifyCompletion supplies a method matching "void OnCompleted(Action action)" // Awaiter optionally implements ICriticalNotifyCompletion
var iNotifyCompletionMap = awaiterType var implementsICriticalNotifyCompletion = awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion));
.GetTypeInfo() MethodInfo unsafeOnCompletedMethod;
.GetRuntimeInterfaceMap(typeof(INotifyCompletion)); if (implementsICriticalNotifyCompletion)
var onCompletedMethod = iNotifyCompletionMap.InterfaceMethods.Single(m => // ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)"
m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase) unsafeOnCompletedMethod = typeof(ICriticalNotifyCompletion).GetRuntimeMethods().Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void) && m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1 && m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action)); && m.GetParameters()[0].ParameterType == typeof(Action));
else
unsafeOnCompletedMethod = null;
// Awaiter optionally implements ICriticalNotifyCompletion // Awaiter must have method matching "void GetResult" or "T GetResult()"
var implementsICriticalNotifyCompletion = var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion)); m.Name.Equals("GetResult")
MethodInfo unsafeOnCompletedMethod; && m.GetParameters().Length == 0);
if (implementsICriticalNotifyCompletion) if (getResultMethod == null)
{ {
// ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)" awaitableInfo = default;
var iCriticalNotifyCompletionMap = awaiterType return false;
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(ICriticalNotifyCompletion));
unsafeOnCompletedMethod = iCriticalNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
}
else
{
unsafeOnCompletedMethod = null;
}
// Awaiter must have method matching "void GetResult" or "T GetResult()"
var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetResult")
&& m.GetParameters().Length == 0);
if (getResultMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
} }
awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
} }
} }

69
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/CoercedAwaitableInfo.cs

@ -4,53 +4,50 @@
using System; using System;
using System.Linq.Expressions; using System.Linq.Expressions;
namespace LINGYUN.Abp.EventBus.CAP.Internal namespace LINGYUN.Abp.EventBus.CAP.Internal;
internal readonly struct CoercedAwaitableInfo
{ {
internal struct CoercedAwaitableInfo public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }
public Type CoercerResultType { get; }
public bool RequiresCoercion => CoercerExpression != null;
public CoercedAwaitableInfo(AwaitableInfo awaitableInfo)
{ {
public AwaitableInfo AwaitableInfo { get; } AwaitableInfo = awaitableInfo;
public Expression CoercerExpression { get; } CoercerExpression = null;
public Type CoercerResultType { get; } CoercerResultType = null;
public bool RequiresCoercion => CoercerExpression != null; }
public CoercedAwaitableInfo(AwaitableInfo awaitableInfo) public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType,
{ AwaitableInfo coercedAwaitableInfo)
AwaitableInfo = awaitableInfo; {
CoercerExpression = null; CoercerExpression = coercerExpression;
CoercerResultType = null; CoercerResultType = coercerResultType;
} AwaitableInfo = coercedAwaitableInfo;
}
public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType, public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info)
AwaitableInfo coercedAwaitableInfo) {
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo))
{ {
CoercerExpression = coercerExpression; info = new CoercedAwaitableInfo(directlyAwaitableInfo);
CoercerResultType = coercerResultType; return true;
AwaitableInfo = coercedAwaitableInfo;
} }
public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info) // It's not directly awaitable, but maybe we can coerce it.
{ // Currently we support coercing FSharpAsync<T>.
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo)) if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
{
info = new CoercedAwaitableInfo(directlyAwaitableInfo);
return true;
}
// It's not directly awaitable, but maybe we can coerce it.
// Currently we support coercing FSharpAsync<T>.
if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
out var coercerExpression, out var coercerExpression,
out var coercerResultType)) out var coercerResultType))
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo))
{ {
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo)) info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
{ return true;
info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
return true;
}
} }
info = default(CoercedAwaitableInfo); info = default;
return false; return false;
}
} }
} }

568
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/ObjectMethodExecutor.cs

@ -1,337 +1,329 @@
// Copyright (c) .NET Foundation. All rights reserved. // Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
#nullable enable
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Reflection; using System.Reflection;
namespace LINGYUN.Abp.EventBus.CAP.Internal namespace LINGYUN.Abp.EventBus.CAP.Internal;
internal class ObjectMethodExecutor
{ {
internal class ObjectMethodExecutor private static readonly ConstructorInfo _objectMethodExecutorAwaitableConstructor =
{ typeof(ObjectMethodExecutorAwaitable).GetConstructor(new[]
// ReSharper disable once InconsistentNaming
private static readonly ConstructorInfo _objectMethodExecutorAwaitableConstructor =
typeof(ObjectMethodExecutorAwaitable).GetConstructor(new[]
{
typeof(object), // customAwaitable
typeof(Func<object, object>), // getAwaiterMethod
typeof(Func<object, bool>), // isCompletedMethod
typeof(Func<object, object>), // getResultMethod
typeof(Action<object, Action>), // onCompletedMethod
typeof(Action<object, Action>) // unsafeOnCompletedMethod
});
private readonly MethodExecutor _executor;
private readonly MethodExecutorAsync _executorAsync;
private readonly object[] _parameterDefaultValues;
private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo, object[] parameterDefaultValues)
{ {
if (methodInfo == null) typeof(object), // customAwaitable
{ typeof(Func<object, object>), // getAwaiterMethod
throw new ArgumentNullException(nameof(methodInfo)); typeof(Func<object, bool>), // isCompletedMethod
} typeof(Func<object, object>), // getResultMethod
typeof(Action<object, Action>), // onCompletedMethod
typeof(Action<object, Action>) // unsafeOnCompletedMethod
})!;
private readonly MethodExecutor? _executor;
private readonly MethodExecutorAsync? _executorAsync;
private readonly object?[]? _parameterDefaultValues;
private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo, object?[]? parameterDefaultValues)
{
if (methodInfo == null) throw new ArgumentNullException(nameof(methodInfo));
MethodInfo = methodInfo; MethodInfo = methodInfo;
MethodParameters = methodInfo.GetParameters(); MethodParameters = methodInfo.GetParameters();
TargetTypeInfo = targetTypeInfo; TargetTypeInfo = targetTypeInfo;
MethodReturnType = methodInfo.ReturnType; MethodReturnType = methodInfo.ReturnType;
var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(MethodReturnType, out var coercedAwaitableInfo); var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(MethodReturnType, out var coercedAwaitableInfo);
IsMethodAsync = isAwaitable; IsMethodAsync = isAwaitable;
AsyncResultType = isAwaitable ? coercedAwaitableInfo.AwaitableInfo.ResultType : null; AsyncResultType = isAwaitable ? coercedAwaitableInfo.AwaitableInfo.ResultType : null;
// Upstream code may prefer to use the sync-executor even for async methods, because if it knows // Upstream code may prefer to use the sync-executor even for async methods, because if it knows
// that the result is a specific Task<T> where T is known, then it can directly cast to that type // that the result is a specific Task<T> where T is known, then it can directly cast to that type
// and await it without the extra heap allocations involved in the _executorAsync code path. // and await it without the extra heap allocations involved in the _executorAsync code path.
_executor = GetExecutor(methodInfo, targetTypeInfo); _executor = GetExecutor(methodInfo, targetTypeInfo);
if (IsMethodAsync) if (IsMethodAsync) _executorAsync = GetExecutorAsync(methodInfo, targetTypeInfo, coercedAwaitableInfo);
{
_executorAsync = GetExecutorAsync(methodInfo, targetTypeInfo, coercedAwaitableInfo);
}
_parameterDefaultValues = parameterDefaultValues; _parameterDefaultValues = parameterDefaultValues;
} }
public MethodInfo MethodInfo { get; } public MethodInfo MethodInfo { get; }
public ParameterInfo[] MethodParameters { get; } public ParameterInfo[] MethodParameters { get; }
public TypeInfo TargetTypeInfo { get; } public TypeInfo TargetTypeInfo { get; }
public Type AsyncResultType { get; } public Type? AsyncResultType { get; }
// This field is made internal set because it is set in unit tests. // This field is made internal set because it is set in unit tests.
public Type MethodReturnType { get; internal set; } public Type MethodReturnType { get; internal set; }
public bool IsMethodAsync { get; } public bool IsMethodAsync { get; }
public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo) public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{ {
return new ObjectMethodExecutor(methodInfo, targetTypeInfo, null); return new ObjectMethodExecutor(methodInfo, targetTypeInfo, null);
} }
public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo, public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo,
object[] parameterDefaultValues) object?[] parameterDefaultValues)
{ {
if (parameterDefaultValues == null) if (parameterDefaultValues == null) throw new ArgumentNullException(nameof(parameterDefaultValues));
{
throw new ArgumentNullException(nameof(parameterDefaultValues));
}
return new ObjectMethodExecutor(methodInfo, targetTypeInfo, parameterDefaultValues); return new ObjectMethodExecutor(methodInfo, targetTypeInfo, parameterDefaultValues);
} }
/// <summary>
/// Executes the configured method on <paramref name="target" />. This can be used whether or not
/// the configured method is asynchronous.
/// </summary>
/// <remarks>
/// Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than
/// ExecuteAsync if you know at compile time what the return type is, because then you can directly
/// "await" that value (via a cast), and then the generated code will be able to reference the
/// resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated
/// code will have to treat the resulting awaitable as a boxed object, because it doesn't know at
/// compile time what type it would be.
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>The method return value.</returns>
public object? Execute(object target, object?[]? parameters)
{
Debug.Assert(_executor != null, "Sync execution is not supported.");
return _executor(target, parameters);
}
/// <summary>
/// Executes the configured method on <paramref name="target" />. This can only be used if the configured
/// method is asynchronous.
/// </summary>
/// <remarks>
/// If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync,
/// which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations
/// as compared with using Execute and then using "await" on the result value typecasted to the known
/// awaitable type. The possible extra heap allocations are for:
/// 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally
/// it's a reference type, and you normally create a new instance per call).
/// 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance
/// of it, and if it is, it will have to be boxed so the calling code can reference it as an object).
/// 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling
/// code doesn't know what type it's going to be).
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>An object that you can "await" to get the method return value.</returns>
public ObjectMethodExecutorAwaitable ExecuteAsync(object target, object?[]? parameters)
{
Debug.Assert(_executorAsync != null, "Async execution is not supported.");
return _executorAsync(target, parameters);
}
public object? GetDefaultValueForParameter(int index)
{
if (_parameterDefaultValues == null)
throw new InvalidOperationException(
$"Cannot call {nameof(GetDefaultValueForParameter)}, because no parameter default values were supplied.");
if (index < 0 || index > MethodParameters.Length - 1) throw new ArgumentOutOfRangeException(nameof(index));
/// <summary> return _parameterDefaultValues[index];
/// Executes the configured method on <paramref name="target" />. This can be used whether or not }
/// the configured method is asynchronous.
/// </summary> private static MethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
/// <remarks> {
/// Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than // Parameters to executor
/// ExecuteAsync if you know at compile time what the return type is, because then you can directly var targetParameter = Expression.Parameter(typeof(object), "target");
/// "await" that value (via a cast), and then the generated code will be able to reference the var parametersParameter = Expression.Parameter(typeof(object?[]), "parameters");
/// resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated
/// code will have to treat the resulting awaitable as a boxed object, because it doesn't know at // Build parameter list
/// compile time what type it would be. var paramInfos = methodInfo.GetParameters();
/// </remarks> var parameters = new List<Expression>(paramInfos.Length);
/// <param name="target">The object whose method is to be executed.</param> for (var i = 0; i < paramInfos.Length; i++)
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>The method return value.</returns>
public object Execute(object target, params object[] parameters)
{ {
return _executor(target, parameters); var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
} }
/// <summary> // Call method
/// Executes the configured method on <paramref name="target" />. This can only be used if the configured var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
/// method is asynchronous. var methodCall = Expression.Call(instanceCast, methodInfo, parameters);
/// </summary>
/// <remarks> // methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)"
/// If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync, // Create function
/// which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations if (methodCall.Type == typeof(void))
/// as compared with using Execute and then using "await" on the result value typecasted to the known
/// awaitable type. The possible extra heap allocations are for:
/// 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally
/// it's a reference type, and you normally create a new instance per call).
/// 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance
/// of it, and if it is, it will have to be boxed so the calling code can reference it as an object).
/// 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling
/// code doesn't know what type it's going to be).
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>An object that you can "await" to get the method return value.</returns>
public ObjectMethodExecutorAwaitable ExecuteAsync(object target, params object[] parameters)
{ {
return _executorAsync(target, parameters); var lambda = Expression.Lambda<VoidMethodExecutor>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return WrapVoidMethod(voidExecutor);
} }
else
public object GetDefaultValueForParameter(int index)
{ {
if (_parameterDefaultValues == null) // must coerce methodCall to match ActionExecutor signature
{ var castMethodCall = Expression.Convert(methodCall, typeof(object));
throw new InvalidOperationException( var lambda = Expression.Lambda<MethodExecutor>(castMethodCall, targetParameter, parametersParameter);
$"Cannot call {nameof(GetDefaultValueForParameter)}, because no parameter default values were supplied."); return lambda.Compile();
}
if (index < 0 || index > MethodParameters.Length - 1)
{
throw new ArgumentOutOfRangeException(nameof(index));
}
return _parameterDefaultValues[index];
} }
}
private static MethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) private static MethodExecutor WrapVoidMethod(VoidMethodExecutor executor)
{
return delegate (object target, object?[]? parameters)
{ {
// Parameters to executor executor(target, parameters);
var targetParameter = Expression.Parameter(typeof(object), "target"); return null;
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters"); };
}
// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}
// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);
// methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)"
// Create function
if (methodCall.Type == typeof(void))
{
var lambda = Expression.Lambda<VoidMethodExecutor>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return WrapVoidMethod(voidExecutor);
}
else
{
// must coerce methodCall to match ActionExecutor signature
var castMethodCall = Expression.Convert(methodCall, typeof(object));
var lambda = Expression.Lambda<MethodExecutor>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile();
}
}
private static MethodExecutor WrapVoidMethod(VoidMethodExecutor executor) private static MethodExecutorAsync GetExecutorAsync(
MethodInfo methodInfo,
TypeInfo targetTypeInfo,
CoercedAwaitableInfo coercedAwaitableInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");
// Build parameter list
var paramInfos = methodInfo.GetParameters();
var parameters = new List<Expression>(paramInfos.Length);
for (var i = 0; i < paramInfos.Length; i++)
{ {
return delegate (object target, object[] parameters) var paramInfo = paramInfos[i];
{ var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
executor(target, parameters); var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
return null;
}; // valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
} }
private static MethodExecutorAsync GetExecutorAsync( // Call method
MethodInfo methodInfo, var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
TypeInfo targetTypeInfo, var methodCall = Expression.Call(instanceCast, methodInfo, parameters);
CoercedAwaitableInfo coercedAwaitableInfo)
{ // Using the method return value, construct an ObjectMethodExecutorAwaitable based on
// Parameters to executor // the info we have about its implementation of the awaitable pattern. Note that all
var targetParameter = Expression.Parameter(typeof(object), "target"); // the funcs/actions we construct here are precompiled, so that only one instance of
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters"); // each is preserved throughout the lifetime of the ObjectMethodExecutor.
// Build parameter list // var getAwaiterFunc = (object awaitable) =>
var parameters = new List<Expression>(); // (object)((CustomAwaitableType)awaitable).GetAwaiter();
var paramInfos = methodInfo.GetParameters(); var customAwaitableParam = Expression.Parameter(typeof(object), "awaitable");
for (var i = 0; i < paramInfos.Length; i++) var awaitableInfo = coercedAwaitableInfo.AwaitableInfo;
{ var postCoercionMethodReturnType = coercedAwaitableInfo.CoercerResultType ?? methodInfo.ReturnType;
var paramInfo = paramInfos[i]; var getAwaiterFunc = Expression.Lambda<Func<object, object>>(
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i)); Expression.Convert(
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType); Expression.Call(
Expression.Convert(customAwaitableParam, postCoercionMethodReturnType),
// valueCast is "(Ti) parameters[i]" awaitableInfo.GetAwaiterMethod),
parameters.Add(valueCast); typeof(object)),
} customAwaitableParam).Compile();
// Call method // var isCompletedFunc = (object awaiter) =>
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType()); // ((CustomAwaiterType)awaiter).IsCompleted;
var methodCall = Expression.Call(instanceCast, methodInfo, parameters); var isCompletedParam = Expression.Parameter(typeof(object), "awaiter");
var isCompletedFunc = Expression.Lambda<Func<object, bool>>(
// Using the method return value, construct an ObjectMethodExecutorAwaitable based on Expression.MakeMemberAccess(
// the info we have about its implementation of the awaitable pattern. Note that all Expression.Convert(isCompletedParam, awaitableInfo.AwaiterType),
// the funcs/actions we construct here are precompiled, so that only one instance of awaitableInfo.AwaiterIsCompletedProperty),
// each is preserved throughout the lifetime of the ObjectMethodExecutor. isCompletedParam).Compile();
// var getAwaiterFunc = (object awaitable) => var getResultParam = Expression.Parameter(typeof(object), "awaiter");
// (object)((CustomAwaitableType)awaitable).GetAwaiter(); Func<object, object> getResultFunc;
var customAwaitableParam = Expression.Parameter(typeof(object), "awaitable"); if (awaitableInfo.ResultType == typeof(void))
var awaitableInfo = coercedAwaitableInfo.AwaitableInfo; // var getResultFunc = (object awaiter) =>
var postCoercionMethodReturnType = coercedAwaitableInfo.CoercerResultType ?? methodInfo.ReturnType; // {
var getAwaiterFunc = Expression.Lambda<Func<object, object>>( // ((CustomAwaiterType)awaiter).GetResult(); // We need to invoke this to surface any exceptions
// return (object)null;
// };
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Block(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
Expression.Constant(null)
),
getResultParam).Compile();
else
// var getResultFunc = (object awaiter) =>
// (object)((CustomAwaiterType)awaiter).GetResult();
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert( Expression.Convert(
Expression.Call( Expression.Call(
Expression.Convert(customAwaitableParam, postCoercionMethodReturnType), Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.GetAwaiterMethod), awaitableInfo.AwaiterGetResultMethod),
typeof(object)), typeof(object)),
customAwaitableParam).Compile(); getResultParam).Compile();
// var isCompletedFunc = (object awaiter) => // var onCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).IsCompleted; // ((CustomAwaiterType)awaiter).OnCompleted(continuation);
var isCompletedParam = Expression.Parameter(typeof(object), "awaiter"); // };
var isCompletedFunc = Expression.Lambda<Func<object, bool>>( var onCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
Expression.MakeMemberAccess( var onCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
Expression.Convert(isCompletedParam, awaitableInfo.AwaiterType), var onCompletedFunc = Expression.Lambda<Action<object, Action>>(
awaitableInfo.AwaiterIsCompletedProperty), Expression.Call(
isCompletedParam).Compile(); Expression.Convert(onCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterOnCompletedMethod,
var getResultParam = Expression.Parameter(typeof(object), "awaiter"); onCompletedParam2),
Func<object, object> getResultFunc; onCompletedParam1,
if (awaitableInfo.ResultType == typeof(void)) onCompletedParam2).Compile();
{
getResultFunc = Expression.Lambda<Func<object, object>>( Action<object, Action>? unsafeOnCompletedFunc = null;
Expression.Block( if (awaitableInfo.AwaiterUnsafeOnCompletedMethod != null)
Expression.Call( {
Expression.Convert(getResultParam, awaitableInfo.AwaiterType), // var unsafeOnCompletedFunc = (object awaiter, Action continuation) => {
awaitableInfo.AwaiterGetResultMethod), // ((CustomAwaiterType)awaiter).UnsafeOnCompleted(continuation);
Expression.Constant(null)
),
getResultParam).Compile();
}
else
{
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
typeof(object)),
getResultParam).Compile();
}
// var onCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).OnCompleted(continuation);
// }; // };
var onCompletedParam1 = Expression.Parameter(typeof(object), "awaiter"); var unsafeOnCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var onCompletedParam2 = Expression.Parameter(typeof(Action), "continuation"); var unsafeOnCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
var onCompletedFunc = Expression.Lambda<Action<object, Action>>( unsafeOnCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call( Expression.Call(
Expression.Convert(onCompletedParam1, awaitableInfo.AwaiterType), Expression.Convert(unsafeOnCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterOnCompletedMethod, awaitableInfo.AwaiterUnsafeOnCompletedMethod,
onCompletedParam2), unsafeOnCompletedParam2),
onCompletedParam1, unsafeOnCompletedParam1,
onCompletedParam2).Compile(); unsafeOnCompletedParam2).Compile();
Action<object, Action> unsafeOnCompletedFunc = null;
if (awaitableInfo.AwaiterUnsafeOnCompletedMethod != null)
{
// var unsafeOnCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).UnsafeOnCompleted(continuation);
// };
var unsafeOnCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var unsafeOnCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
unsafeOnCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call(
Expression.Convert(unsafeOnCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterUnsafeOnCompletedMethod,
unsafeOnCompletedParam2),
unsafeOnCompletedParam1,
unsafeOnCompletedParam2).Compile();
}
// If we need to pass the method call result through a coercer function to get an
// awaitable, then do so.
var coercedMethodCall = coercedAwaitableInfo.RequiresCoercion
? Expression.Invoke(coercedAwaitableInfo.CoercerExpression, methodCall)
: (Expression)methodCall;
// return new ObjectMethodExecutorAwaitable(
// (object)coercedMethodCall,
// getAwaiterFunc,
// isCompletedFunc,
// getResultFunc,
// onCompletedFunc,
// unsafeOnCompletedFunc);
var returnValueExpression = Expression.New(
_objectMethodExecutorAwaitableConstructor,
Expression.Convert(coercedMethodCall, typeof(object)),
Expression.Constant(getAwaiterFunc),
Expression.Constant(isCompletedFunc),
Expression.Constant(getResultFunc),
Expression.Constant(onCompletedFunc),
Expression.Constant(unsafeOnCompletedFunc, typeof(Action<object, Action>)));
var lambda =
Expression.Lambda<MethodExecutorAsync>(returnValueExpression, targetParameter, parametersParameter);
return lambda.Compile();
} }
private delegate ObjectMethodExecutorAwaitable MethodExecutorAsync(object target, params object[] parameters); // If we need to pass the method call result through a coercer function to get an
// awaitable, then do so.
var coercedMethodCall = coercedAwaitableInfo.RequiresCoercion
? Expression.Invoke(coercedAwaitableInfo.CoercerExpression, methodCall)
: (Expression)methodCall;
// return new ObjectMethodExecutorAwaitable(
// (object)coercedMethodCall,
// getAwaiterFunc,
// isCompletedFunc,
// getResultFunc,
// onCompletedFunc,
// unsafeOnCompletedFunc);
var returnValueExpression = Expression.New(
_objectMethodExecutorAwaitableConstructor,
Expression.Convert(coercedMethodCall, typeof(object)),
Expression.Constant(getAwaiterFunc),
Expression.Constant(isCompletedFunc),
Expression.Constant(getResultFunc),
Expression.Constant(onCompletedFunc),
Expression.Constant(unsafeOnCompletedFunc, typeof(Action<object, Action>)));
var lambda =
Expression.Lambda<MethodExecutorAsync>(returnValueExpression, targetParameter, parametersParameter);
return lambda.Compile();
}
private delegate object MethodExecutor(object target, params object[] parameters); private delegate ObjectMethodExecutorAwaitable MethodExecutorAsync(object target, object?[]? parameters);
private delegate void VoidMethodExecutor(object target, object[] parameters); private delegate object? MethodExecutor(object target, object?[]? parameters);
}
private delegate void VoidMethodExecutor(object target, object?[]? parameters);
} }

171
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/ObjectMethodExecutorAwaitable.cs

@ -1,115 +1,114 @@
using System; using System;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
namespace LINGYUN.Abp.EventBus.CAP.Internal namespace LINGYUN.Abp.EventBus.CAP.Internal;
/// <summary>
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync" /> can
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal readonly struct ObjectMethodExecutorAwaitable
{ {
/// <summary> private readonly object _customAwaitable;
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync" /> can private readonly Func<object, object> _getAwaiterMethod;
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an private readonly Func<object, bool> _isCompletedMethod;
/// application-defined custom awaitable. private readonly Func<object, object> _getResultMethod;
/// </summary> private readonly Action<object, Action> _onCompletedMethod;
internal struct ObjectMethodExecutorAwaitable private readonly Action<object, Action> _unsafeOnCompletedMethod;
// Perf note: since we're requiring the customAwaitable to be supplied here as an object,
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.
public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaitable = customAwaitable;
_getAwaiterMethod = getAwaiterMethod;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}
public Awaiter GetAwaiter()
{
var customAwaiter = _getAwaiterMethod(_customAwaitable);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod,
_unsafeOnCompletedMethod);
}
public readonly struct Awaiter : ICriticalNotifyCompletion
{ {
private readonly object _customAwaitable; private readonly object _customAwaiter;
private readonly Func<object, object> _getAwaiterMethod;
private readonly Func<object, bool> _isCompletedMethod; private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod; private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod; private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod; private readonly Action<object, Action> _unsafeOnCompletedMethod;
// Perf note: since we're requiring the customAwaitable to be supplied here as an object, public Awaiter(
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't object customAwaiter,
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.
public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
Func<object, bool> isCompletedMethod, Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod, Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod, Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod) Action<object, Action> unsafeOnCompletedMethod)
{ {
_customAwaitable = customAwaitable; _customAwaiter = customAwaiter;
_getAwaiterMethod = getAwaiterMethod;
_isCompletedMethod = isCompletedMethod; _isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod; _getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod; _onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod; _unsafeOnCompletedMethod = unsafeOnCompletedMethod;
} }
public Awaiter GetAwaiter() public bool IsCompleted => _isCompletedMethod(_customAwaiter);
public object GetResult()
{ {
var customAwaiter = _getAwaiterMethod(_customAwaitable); return _getResultMethod(_customAwaiter);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod,
_unsafeOnCompletedMethod);
} }
public struct Awaiter : ICriticalNotifyCompletion public void OnCompleted(Action continuation)
{ {
private readonly object _customAwaiter; _onCompletedMethod(_customAwaiter, continuation);
private readonly Func<object, bool> _isCompletedMethod; }
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;
public Awaiter(
object customAwaiter,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaiter = customAwaiter;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}
public bool IsCompleted => _isCompletedMethod(_customAwaiter);
public object GetResult()
{
return _getResultMethod(_customAwaiter);
}
public void OnCompleted(Action continuation)
{
_onCompletedMethod(_customAwaiter, continuation);
}
public void UnsafeOnCompleted(Action continuation) public void UnsafeOnCompleted(Action continuation)
{ {
// If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted. // If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted.
// If not, fall back on using its OnCompleted. // If not, fall back on using its OnCompleted.
// //
// Why this is safe: // Why this is safe:
// - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it // - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it
// needs the execution context to be preserved (which it signals by calling OnCompleted), or // needs the execution context to be preserved (which it signals by calling OnCompleted), or
// that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not* // that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not*
// to preserve and restore the context, so we prefer that where possible. // to preserve and restore the context, so we prefer that where possible.
// - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted, // - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted,
// there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen // there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen
// if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to // if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to
// pass the call on to the underlying awaitable's OnCompleted method. // pass the call on to the underlying awaitable's OnCompleted method.
var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod; var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod;
underlyingMethodToUse(_customAwaiter, continuation); underlyingMethodToUse(_customAwaiter, continuation);
}
} }
} }
} }

231
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/LINGYUN/Abp/EventBus/CAP/Internal/ObjectMethodExecutorFSharpSupport.cs

@ -8,138 +8,131 @@ using System.Reflection;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace LINGYUN.Abp.EventBus.CAP.Internal namespace LINGYUN.Abp.EventBus.CAP.Internal;
/// <summary>
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
/// an <see cref="Expression" /> for mapping instances of that type to a C# awaitable.
/// </summary>
/// <remarks>
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
/// to FSharp types have to be constructed dynamically at runtime.
/// </remarks>
internal static class ObjectMethodExecutorFSharpSupport
{ {
/// <summary> private static readonly object _fsharpValuesCacheLock = new();
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying private static Assembly _fsharpCoreAssembly;
/// an <see cref="Expression" /> for mapping instances of that type to a C# awaitable. private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod;
/// </summary> private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
/// <remarks> private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
/// to FSharp types have to be constructed dynamically at runtime. Type possibleFSharpAsyncType,
/// </remarks> out Expression coerceToAwaitableExpression,
internal static class ObjectMethodExecutorFSharpSupport out Type awaitableType)
{ {
private static readonly object _fsharpValuesCacheLock = new object(); var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
private static Assembly _fsharpCoreAssembly; ? possibleFSharpAsyncType.GetGenericTypeDefinition()
private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod; : null;
private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;
public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
Type possibleFSharpAsyncType,
out Expression coerceToAwaitableExpression,
out Type awaitableType)
{
var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
? possibleFSharpAsyncType.GetGenericTypeDefinition()
: null;
if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
{
coerceToAwaitableExpression = null;
awaitableType = null;
return false;
}
var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single(); if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType); {
coerceToAwaitableExpression = null;
// coerceToAwaitableExpression = (object fsharpAsync) => awaitableType = null;
// { return false;
// return (object)FSharpAsync.StartAsTask<TResult>(
// (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);
return true;
} }
private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType) var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single();
{ awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType);
var typeFullName = possibleFSharpAsyncGenericType?.FullName;
if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal)) // coerceToAwaitableExpression = (object fsharpAsync) =>
{ // {
return false; // return (object)FSharpAsync.StartAsTask<TResult>(
} // (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);
return true;
}
lock (_fsharpValuesCacheLock) private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType)
{ {
if (_fsharpCoreAssembly != null) var typeFullName = possibleFSharpAsyncGenericType?.FullName;
{ if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal))
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly; return false;
}
return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType); lock (_fsharpValuesCacheLock)
} {
if (_fsharpCoreAssembly != null)
// Since we've already found the real FSharpAsync.Core assembly, we just have
// to check that the supplied FSharpAsync`1 type is the one from that assembly.
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly;
// We'll keep trying to find the FSharp types/values each time any type called
// FSharpAsync`1 is supplied.
return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType);
} }
}
private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType) private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType)
{
var assembly = possibleFSharpAsyncGenericType.Assembly;
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1");
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync");
if (fsharpOptionType == null || fsharpAsyncType == null) return false;
// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetRuntimeProperty("None");
// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetRuntimeProperty("None");
// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
{ {
var assembly = possibleFSharpAsyncGenericType.Assembly; var parameters = candidateMethodInfo.GetParameters();
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1"); if (parameters.Length == 3
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync"); && TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType)
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType
if (fsharpOptionType == null || fsharpAsyncType == null) && parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
return false;
}
// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetTypeInfo()
.GetRuntimeProperty("None");
// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetTypeInfo()
.GetRuntimeProperty("None");
// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
{ {
var parameters = candidateMethodInfo.GetParameters(); // This really does look like the correct method (and hence assembly).
if (parameters.Length == 3 _fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
&& TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType) _fsharpCoreAssembly = assembly;
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType break;
&& parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
// This really does look like the correct method (and hence assembly).
_fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
_fsharpCoreAssembly = assembly;
break;
}
} }
return _fsharpCoreAssembly != null;
} }
private static bool TypesHaveSameIdentity(Type type1, Type type2) return _fsharpCoreAssembly != null;
{ }
return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal) private static bool TypesHaveSameIdentity(Type type1, Type type2)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal); {
} return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal);
} }
} }

33
aspnet-core/framework/common/LINGYUN.Abp.EventBus.CAP/Microsoft/Extensions/DependencyInjection/ServiceCollectionExtensions.cs

@ -4,26 +4,25 @@ using DotNetCore.CAP.Serialization;
using LINGYUN.Abp.EventBus.CAP; using LINGYUN.Abp.EventBus.CAP;
using System; using System;
namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection;
/// <summary>
/// CAP ServiceCollectionExtensions
/// </summary>
public static class ServiceCollectionExtensions
{ {
/// <summary> /// <summary>
/// CAP ServiceCollectionExtensions /// Adds and configures the consistence services for the consistency.
/// </summary> /// </summary>
public static class ServiceCollectionExtensions /// <param name="services"></param>
/// <param name="capAction"></param>
/// <returns></returns>
public static IServiceCollection AddCAPEventBus(this IServiceCollection services, Action<CapOptions> capAction)
{ {
/// <summary> services.AddCap(capAction);
/// Adds and configures the consistence services for the consistency. // 替换为自己的实现
/// </summary> services.AddSingleton<ISubscribeInvoker, AbpCAPSubscribeInvoker>();
/// <param name="services"></param> services.AddSingleton<ISerializer, AbpCapSerializer>();
/// <param name="capAction"></param> return services;
/// <returns></returns>
public static IServiceCollection AddCAPEventBus(this IServiceCollection services, Action<CapOptions> capAction)
{
services.AddCap(capAction);
// 替换为自己的实现
services.AddSingleton<ISubscribeInvoker, AbpCAPSubscribeInvoker>();
services.AddSingleton<ISerializer, AbpCapSerializer>();
return services;
}
} }
} }

7
aspnet-core/modules/identityServer/LINGYUN.Abp.IdentityServer.Session/LINGYUN/Abp/IdentityServer/Session/AbpIdentityServerSessionModule.cs

@ -3,7 +3,9 @@ using LINGYUN.Abp.Identity;
using LINGYUN.Abp.Identity.Session; using LINGYUN.Abp.Identity.Session;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;
using Volo.Abp.IdentityServer;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Volo.Abp.Security.Claims;
namespace LINGYUN.Abp.IdentityServer.Session; namespace LINGYUN.Abp.IdentityServer.Session;
@ -20,6 +22,11 @@ public class AbpIdentityServerSessionModule : AbpModule
options.EventServiceHandlers.Add<AbpIdentitySessionEventServiceHandler>(); options.EventServiceHandlers.Add<AbpIdentitySessionEventServiceHandler>();
}); });
Configure<AbpClaimsServiceOptions>(options =>
{
options.RequestedClaims.Add(AbpClaimTypes.SessionId);
});
Configure<IdentitySessionSignInOptions>(options => Configure<IdentitySessionSignInOptions>(options =>
{ {
// UserLoginSuccessEvent由IdentityServer发布, 无需显式保存会话 // UserLoginSuccessEvent由IdentityServer发布, 无需显式保存会话

4
aspnet-core/services/LY.MicroService.AuthServer/appsettings.Development.json

@ -88,7 +88,9 @@
}, },
"AuthServer": { "AuthServer": {
"Authority": "http://127.0.0.1:44385/", "Authority": "http://127.0.0.1:44385/",
"ApiName": "lingyun-abp-application" "Audience": "lingyun-abp-application",
"MapInboundClaims": false,
"RequireHttpsMetadata": false
}, },
"OpenIddict": { "OpenIddict": {
"Applications": { "Applications": {

4
aspnet-core/services/LY.MicroService.IdentityServer/appsettings.Development.json

@ -88,7 +88,9 @@
}, },
"AuthServer": { "AuthServer": {
"Authority": "http://127.0.0.1:44385/", "Authority": "http://127.0.0.1:44385/",
"ApiName": "lingyun-abp-application" "Audience": "lingyun-abp-application",
"MapInboundClaims": false,
"RequireHttpsMetadata": false
}, },
"IdentityServer": { "IdentityServer": {
"Clients": { "Clients": {

6
aspnet-core/templates/content/Directory.Packages.props

@ -1,9 +1,9 @@
<Project> <Project>
<PropertyGroup> <PropertyGroup>
<DotNetCoreCAPPackageVersion>8.1.1</DotNetCoreCAPPackageVersion> <DotNetCoreCAPPackageVersion>8.2.0</DotNetCoreCAPPackageVersion>
<ElsaPackageVersion>2.14.1</ElsaPackageVersion> <ElsaPackageVersion>2.14.1</ElsaPackageVersion>
<VoloAbpPackageVersion>8.2.0</VoloAbpPackageVersion> <VoloAbpPackageVersion>8.2.1</VoloAbpPackageVersion>
<LINGYUNAbpPackageVersion>8.2.0</LINGYUNAbpPackageVersion> <LINGYUNAbpPackageVersion>8.2.1</LINGYUNAbpPackageVersion>
<MicrosoftExtensionsPackageVersion>8.0.0</MicrosoftExtensionsPackageVersion> <MicrosoftExtensionsPackageVersion>8.0.0</MicrosoftExtensionsPackageVersion>
<MicrosoftAspNetCorePackageVersion>8.0.0</MicrosoftAspNetCorePackageVersion> <MicrosoftAspNetCorePackageVersion>8.0.0</MicrosoftAspNetCorePackageVersion>
<MicrosoftEntityFrameworkCorePackageVersion>8.0.0</MicrosoftEntityFrameworkCorePackageVersion> <MicrosoftEntityFrameworkCorePackageVersion>8.0.0</MicrosoftEntityFrameworkCorePackageVersion>

4
aspnet-core/templates/content/common.props

@ -1,12 +1,12 @@
<Project> <Project>
<PropertyGroup> <PropertyGroup>
<LangVersion>latest</LangVersion> <LangVersion>latest</LangVersion>
<Version>8.2.0</Version> <Version>8.2.1</Version>
<Authors>colin</Authors> <Authors>colin</Authors>
<NoWarn>$(NoWarn);CS1591;CS0436;CS8618;NU1803</NoWarn> <NoWarn>$(NoWarn);CS1591;CS0436;CS8618;NU1803</NoWarn>
<PackageProjectUrl>https://github.com/colinin/abp-next-admin</PackageProjectUrl> <PackageProjectUrl>https://github.com/colinin/abp-next-admin</PackageProjectUrl>
<PackageOutputPath>$(SolutionDir)LocalNuget</PackageOutputPath> <PackageOutputPath>$(SolutionDir)LocalNuget</PackageOutputPath>
<PackageVersion>8.2.0</PackageVersion> <PackageVersion>8.2.1</PackageVersion>
<PackageLicenseExpression>MIT</PackageLicenseExpression> <PackageLicenseExpression>MIT</PackageLicenseExpression>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/colinin/abp-next-admin</RepositoryUrl> <RepositoryUrl>https://github.com/colinin/abp-next-admin</RepositoryUrl>

4
common.props

@ -1,12 +1,12 @@
<Project> <Project>
<PropertyGroup> <PropertyGroup>
<LangVersion>latest</LangVersion> <LangVersion>latest</LangVersion>
<Version>8.2.0</Version> <Version>8.2.1</Version>
<Authors>colin</Authors> <Authors>colin</Authors>
<NoWarn>$(NoWarn);CS1591;CS0436;CS8618;NU1803</NoWarn> <NoWarn>$(NoWarn);CS1591;CS0436;CS8618;NU1803</NoWarn>
<PackageProjectUrl>https://github.com/colinin/abp-next-admin</PackageProjectUrl> <PackageProjectUrl>https://github.com/colinin/abp-next-admin</PackageProjectUrl>
<PackageOutputPath>$(SolutionDir)LocalNuget</PackageOutputPath> <PackageOutputPath>$(SolutionDir)LocalNuget</PackageOutputPath>
<PackageVersion>8.2.0</PackageVersion> <PackageVersion>8.2.1</PackageVersion>
<PackageLicenseExpression>MIT</PackageLicenseExpression> <PackageLicenseExpression>MIT</PackageLicenseExpression>
<RepositoryType>git</RepositoryType> <RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/colinin/abp-next-admin</RepositoryUrl> <RepositoryUrl>https://github.com/colinin/abp-next-admin</RepositoryUrl>

Loading…
Cancel
Save