Browse Source
Merge pull request #7991 from AndreMaz/feature/kafka_connection_timeout
Allow to configure kafkajs `connectionTimeout` property
pull/8343/head
Andrew Shvayka
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with
5 additions and
0 deletions
msa/js-executor/config/custom-environment-variables.yml
msa/js-executor/config/default.yml
msa/js-executor/queue/kafkaTemplate.ts
@ -32,6 +32,7 @@ kafka:
linger_ms : "TB_KAFKA_LINGER_MS" # for producer
partitions_consumed_concurrently : "TB_KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
requestTimeout : "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS"
connectionTimeout : "TB_KAFKA_CONNECTION_TIMEOUT_MS"
compression : "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed
topic_properties : "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
use_confluent_cloud : "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD"
@ -32,6 +32,7 @@ kafka:
linger_ms : "5" # for producer
partitions_consumed_concurrently : "1" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
requestTimeout : "30000" # The default value in kafkajs is: 30000
connectionTimeout : "1000" # The default value in kafkajs is: 1000
compression : "gzip" # gzip or uncompressed
topic_properties : "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1"
use_confluent_cloud : false
@ -42,6 +42,7 @@ export class KafkaTemplate implements IQueue {
private maxBatchSize = Number ( config . get ( 'kafka.batch_size' ) ) ;
private linger = Number ( config . get ( 'kafka.linger_ms' ) ) ;
private requestTimeout = Number ( config . get ( 'kafka.requestTimeout' ) ) ;
private connectionTimeout = Number ( config . get ( 'kafka.connectionTimeout' ) ) ;
private compressionType = ( config . get ( 'kafka.compression' ) === "gzip" ) ? CompressionTypes.GZIP : CompressionTypes.None ;
private partitionsConsumedConcurrently = Number ( config . get ( 'kafka.partitions_consumed_concurrently' ) ) ;
@ -80,6 +81,8 @@ export class KafkaTemplate implements IQueue {
kafkaConfig [ 'requestTimeout' ] = this . requestTimeout ;
kafkaConfig [ 'connectionTimeout' ] = this . connectionTimeout ;
if ( useConfluent ) {
kafkaConfig [ 'sasl' ] = {
mechanism : config.get ( 'kafka.confluent.sasl.mechanism' ) as any ,