@ -162,13 +162,13 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected async override Task PublishToEventBusAsync ( Type eventType , object eventData )
{
await PublishAsync (
AbpKafkaEventBusOptions . TopicName ,
eventType ,
eventData ,
new Headers
{
{ "messageId" , System . Text . Encoding . UTF8 . GetBytes ( Guid . NewGuid ( ) . ToString ( "N" ) ) }
} ,
null
{ "messageId" , System . Text . Encoding . UTF8 . GetBytes ( Guid . NewGuid ( ) . ToString ( "N" ) ) }
}
) ;
}
@ -188,42 +188,31 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
new Headers
{
{ "messageId" , System . Text . Encoding . UTF8 . GetBytes ( outgoingEvent . Id . ToString ( "N" ) ) }
} ,
null
}
) ;
}
public override Task PublishManyFromOutboxAsync ( IEnumerable < OutgoingEventInfo > outgoingEvents , OutboxConfig outboxConfig )
{
var producer = ProducerPool . Get ( ) ;
var producer = ProducerPool . Get ( AbpKafkaEventBusOptions . ConnectionName ) ;
var outgoingEventArray = outgoingEvents . ToArray ( ) ;
producer . BeginTransaction ( ) ;
t ry
foreach ( va r outgoingEvent in outgoingEventArra y )
{
foreach ( var outgoingEvent in outgoingEventArray )
var messageId = outgoingEvent . Id . ToString ( "N" ) ;
var headers = new Headers
{
var messageId = outgoingEvent . Id . ToString ( "N" ) ;
var headers = new Headers
{
{ "messageId" , System . Text . Encoding . UTF8 . GetBytes ( messageId ) }
} ;
producer . Produce (
AbpKafkaEventBusOptions . TopicName ,
new Message < string , byte [ ] >
{
Key = outgoingEvent . EventName ,
Value = outgoingEvent . EventData ,
Headers = headers
} ) ;
}
{ "messageId" , System . Text . Encoding . UTF8 . GetBytes ( messageId ) }
} ;
producer . CommitTransaction ( ) ;
}
catch ( Exception e )
{
producer . AbortTransaction ( ) ;
throw ;
producer . Produce (
AbpKafkaEventBusOptions . TopicName ,
new Message < string , byte [ ] >
{
Key = outgoingEvent . EventName ,
Value = outgoingEvent . EventData ,
Headers = headers
} ) ;
}
return Task . CompletedTask ;
@ -253,47 +242,22 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
return Serializer . Serialize ( eventData ) ;
}
public virtual async Task PublishAsync ( Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
{
await PublishAsync (
AbpKafkaEventBusOptions . TopicName ,
eventType ,
eventData ,
headers ,
headersArguments
) ;
}
private Task PublishAsync ( string topicName , Type eventType , object eventData , Headers headers , Dictionary < string , object > headersArguments )
private Task PublishAsync ( string topicName , Type eventType , object eventData , Headers headers )
{
var eventName = EventNameAttribute . GetNameOrDefault ( eventType ) ;
var body = Serializer . Serialize ( eventData ) ;
return PublishAsync ( topicName , eventName , body , headers , headersArguments ) ;
return PublishAsync ( topicName , eventName , body , headers ) ;
}
private Task < DeliveryResult < string , byte [ ] > > PublishAsync (
string topicName ,
string eventName ,
byte [ ] body ,
Headers headers ,
Dictionary < string , object > headersArguments )
Headers headers )
{
var producer = ProducerPool . Get ( AbpKafkaEventBusOptions . ConnectionName ) ;
return PublishAsync ( producer , topicName , eventName , body , headers , headersArguments ) ;
}
private Task < DeliveryResult < string , byte [ ] > > PublishAsync (
IProducer < string , byte [ ] > producer ,
string topicName ,
string eventName ,
byte [ ] body ,
Headers headers ,
Dictionary < string , object > headersArguments )
{
SetEventMessageHeaders ( headers , headersArguments ) ;
return producer . ProduceAsync (
topicName ,
new Message < string , byte [ ] >
@ -304,20 +268,6 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
} ) ;
}
private void SetEventMessageHeaders ( Headers headers , Dictionary < string , object > headersArguments )
{
if ( headersArguments = = null )
{
return ;
}
foreach ( var header in headersArguments )
{
headers . Remove ( header . Key ) ;
headers . Add ( header . Key , Serializer . Serialize ( header . Value ) ) ;
}
}
private List < IEventHandlerFactory > GetOrCreateHandlerFactories ( Type eventType )
{
return HandlerFactories . GetOrAdd (