|
|
|
@ -15,6 +15,7 @@ |
|
|
|
///
|
|
|
|
|
|
|
|
import config from 'config'; |
|
|
|
import fs from 'node:fs'; |
|
|
|
import { _logger, KafkaJsWinstonLogCreator } from '../config/logger'; |
|
|
|
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|
|
|
import { IQueue } from './queue.models'; |
|
|
|
@ -29,8 +30,10 @@ import { |
|
|
|
Producer, |
|
|
|
TopicMessages |
|
|
|
} from 'kafkajs'; |
|
|
|
import { isNotEmptyStr } from '../api/utils'; |
|
|
|
import { KeyObject } from 'tls'; |
|
|
|
|
|
|
|
import process, { kill, exit } from 'process'; |
|
|
|
import process, { exit, kill } from 'process'; |
|
|
|
|
|
|
|
export class KafkaTemplate implements IQueue { |
|
|
|
|
|
|
|
@ -64,6 +67,7 @@ export class KafkaTemplate implements IQueue { |
|
|
|
const queuePrefix: string = config.get('queue_prefix'); |
|
|
|
const requestTopic: string = queuePrefix ? queuePrefix + "." + config.get('request_topic') : config.get('request_topic'); |
|
|
|
const useConfluent = config.get('kafka.use_confluent_cloud'); |
|
|
|
const enabledSsl = Boolean(config.get('kafka.ssl.enabled')); |
|
|
|
const groupId:string = queuePrefix ? queuePrefix + ".js-executor-group" : "js-executor-group"; |
|
|
|
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
|
|
|
this.logger.info('Kafka Requests Topic: %s', requestTopic); |
|
|
|
@ -93,6 +97,31 @@ export class KafkaTemplate implements IQueue { |
|
|
|
kafkaConfig['ssl'] = true; |
|
|
|
} |
|
|
|
|
|
|
|
if (enabledSsl) { |
|
|
|
const certFilePath: string = config.has('kafka.ssl.cert_file') ? config.get('kafka.ssl.cert_file') : ''; |
|
|
|
const keyFilePath: string = config.has('kafka.ssl.key_file') ? config.get('kafka.ssl.key_file') : ''; |
|
|
|
const keyPassword: string = config.has('kafka.ssl.key_password') ? config.get('kafka.ssl.key_password') : ''; |
|
|
|
const caFilePath: string = config.has('kafka.ssl.ca_file') ? config.get('kafka.ssl.ca_file') : ''; |
|
|
|
|
|
|
|
kafkaConfig.ssl = {}; |
|
|
|
|
|
|
|
if (isNotEmptyStr(certFilePath)) { |
|
|
|
kafkaConfig.ssl.cert = fs.readFileSync(certFilePath, 'utf-8'); |
|
|
|
} |
|
|
|
|
|
|
|
if (isNotEmptyStr(keyFilePath)) { |
|
|
|
const keyConfig: KeyObject = {pem: fs.readFileSync(keyFilePath, 'utf-8')}; |
|
|
|
if (isNotEmptyStr(keyPassword)) { |
|
|
|
keyConfig.passphrase = keyPassword; |
|
|
|
} |
|
|
|
kafkaConfig.ssl.key = [keyConfig]; |
|
|
|
} |
|
|
|
|
|
|
|
if (isNotEmptyStr(caFilePath)) { |
|
|
|
kafkaConfig.ssl.ca = fs.readFileSync(caFilePath, 'utf-8'); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
this.parseTopicProperties(); |
|
|
|
|
|
|
|
this.kafkaClient = new Kafka(kafkaConfig); |
|
|
|
@ -213,6 +242,7 @@ export class KafkaTemplate implements IQueue { |
|
|
|
|
|
|
|
private createTopic(topic: string, partitions: number): Promise<boolean> { |
|
|
|
return this.kafkaAdmin.createTopics({ |
|
|
|
timeout: this.requestTimeout, |
|
|
|
topics: [{ |
|
|
|
topic: topic, |
|
|
|
numPartitions: partitions, |
|
|
|
|