|
|
|
@ -51,108 +51,103 @@ export class KafkaTemplate implements IQueue { |
|
|
|
private batchMessages: TopicMessages[] = []; |
|
|
|
private sendLoopInstance: NodeJS.Timeout; |
|
|
|
|
|
|
|
name = 'Kafka'; |
|
|
|
|
|
|
|
constructor() { |
|
|
|
} |
|
|
|
|
|
|
|
async init(): Promise<void> { |
|
|
|
try { |
|
|
|
const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); |
|
|
|
const requestTopic: string = config.get('request_topic'); |
|
|
|
const useConfluent = config.get('kafka.use_confluent_cloud'); |
|
|
|
|
|
|
|
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
|
|
|
this.logger.info('Kafka Requests Topic: %s', requestTopic); |
|
|
|
|
|
|
|
let kafkaConfig: KafkaConfig = { |
|
|
|
brokers: kafkaBootstrapServers.split(','), |
|
|
|
logLevel: logLevel.INFO, |
|
|
|
logCreator: KafkaJsWinstonLogCreator |
|
|
|
}; |
|
|
|
const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); |
|
|
|
const requestTopic: string = config.get('request_topic'); |
|
|
|
const useConfluent = config.get('kafka.use_confluent_cloud'); |
|
|
|
|
|
|
|
if (this.kafkaClientId) { |
|
|
|
kafkaConfig['clientId'] = this.kafkaClientId; |
|
|
|
} else { |
|
|
|
this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); |
|
|
|
} |
|
|
|
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
|
|
|
this.logger.info('Kafka Requests Topic: %s', requestTopic); |
|
|
|
|
|
|
|
kafkaConfig['requestTimeout'] = this.requestTimeout; |
|
|
|
let kafkaConfig: KafkaConfig = { |
|
|
|
brokers: kafkaBootstrapServers.split(','), |
|
|
|
logLevel: logLevel.INFO, |
|
|
|
logCreator: KafkaJsWinstonLogCreator |
|
|
|
}; |
|
|
|
|
|
|
|
if (useConfluent) { |
|
|
|
kafkaConfig['sasl'] = { |
|
|
|
mechanism: config.get('kafka.confluent.sasl.mechanism') as any, |
|
|
|
username: config.get('kafka.confluent.username'), |
|
|
|
password: config.get('kafka.confluent.password') |
|
|
|
}; |
|
|
|
kafkaConfig['ssl'] = true; |
|
|
|
} |
|
|
|
if (this.kafkaClientId) { |
|
|
|
kafkaConfig['clientId'] = this.kafkaClientId; |
|
|
|
} else { |
|
|
|
this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); |
|
|
|
} |
|
|
|
|
|
|
|
this.parseTopicProperties(); |
|
|
|
kafkaConfig['requestTimeout'] = this.requestTimeout; |
|
|
|
|
|
|
|
this.kafkaClient = new Kafka(kafkaConfig); |
|
|
|
this.kafkaAdmin = this.kafkaClient.admin(); |
|
|
|
await this.kafkaAdmin.connect(); |
|
|
|
if (useConfluent) { |
|
|
|
kafkaConfig['sasl'] = { |
|
|
|
mechanism: config.get('kafka.confluent.sasl.mechanism') as any, |
|
|
|
username: config.get('kafka.confluent.username'), |
|
|
|
password: config.get('kafka.confluent.password') |
|
|
|
}; |
|
|
|
kafkaConfig['ssl'] = true; |
|
|
|
} |
|
|
|
|
|
|
|
let partitions = 1; |
|
|
|
this.parseTopicProperties(); |
|
|
|
|
|
|
|
for (let i = 0; i < this.configEntries.length; i++) { |
|
|
|
let param = this.configEntries[i]; |
|
|
|
if (param.name === 'partitions') { |
|
|
|
partitions = param.value; |
|
|
|
this.configEntries.splice(i, 1); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
this.kafkaClient = new Kafka(kafkaConfig); |
|
|
|
this.kafkaAdmin = this.kafkaClient.admin(); |
|
|
|
await this.kafkaAdmin.connect(); |
|
|
|
|
|
|
|
let topics = await this.kafkaAdmin.listTopics(); |
|
|
|
let partitions = 1; |
|
|
|
|
|
|
|
if (!topics.includes(requestTopic)) { |
|
|
|
let createRequestTopicResult = await this.createTopic(requestTopic, partitions); |
|
|
|
if (createRequestTopicResult) { |
|
|
|
this.logger.info('Created new topic: %s', requestTopic); |
|
|
|
} |
|
|
|
for (let i = 0; i < this.configEntries.length; i++) { |
|
|
|
let param = this.configEntries[i]; |
|
|
|
if (param.name === 'partitions') { |
|
|
|
partitions = param.value; |
|
|
|
this.configEntries.splice(i, 1); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); |
|
|
|
this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); |
|
|
|
let topics = await this.kafkaAdmin.listTopics(); |
|
|
|
|
|
|
|
const {CRASH} = this.consumer.events; |
|
|
|
if (!topics.includes(requestTopic)) { |
|
|
|
let createRequestTopicResult = await this.createTopic(requestTopic, partitions); |
|
|
|
if (createRequestTopicResult) { |
|
|
|
this.logger.info('Created new topic: %s', requestTopic); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
this.consumer.on(CRASH, async (e) => { |
|
|
|
this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); |
|
|
|
if (!e.payload.restart) { |
|
|
|
this.logger.error('Going to exit due to not retryable error!'); |
|
|
|
await this.destroy(-1); |
|
|
|
} |
|
|
|
}); |
|
|
|
this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); |
|
|
|
this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); |
|
|
|
|
|
|
|
const messageProcessor = new JsInvokeMessageProcessor(this); |
|
|
|
await this.consumer.connect(); |
|
|
|
await this.producer.connect(); |
|
|
|
this.sendLoopWithLinger(); |
|
|
|
await this.consumer.subscribe({topic: requestTopic}); |
|
|
|
|
|
|
|
await this.consumer.run({ |
|
|
|
partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, |
|
|
|
eachMessage: async ({topic, partition, message}) => { |
|
|
|
let headers = message.headers; |
|
|
|
let key = message.key || new Buffer([]); |
|
|
|
let msg = { |
|
|
|
key: key.toString('utf8'), |
|
|
|
data: message.value, |
|
|
|
headers: { |
|
|
|
data: headers |
|
|
|
} |
|
|
|
}; |
|
|
|
messageProcessor.onJsInvokeMessage(msg); |
|
|
|
}, |
|
|
|
}); |
|
|
|
const {CRASH} = this.consumer.events; |
|
|
|
|
|
|
|
} catch (e: any) { |
|
|
|
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|
|
|
this.logger.error(e.stack); |
|
|
|
await this.destroy(-1); |
|
|
|
} |
|
|
|
} |
|
|
|
this.consumer.on(CRASH, async (e) => { |
|
|
|
this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); |
|
|
|
if (!e.payload.restart) { |
|
|
|
this.logger.error('Going to exit due to not retryable error!'); |
|
|
|
await this.destroy(); |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
const messageProcessor = new JsInvokeMessageProcessor(this); |
|
|
|
await this.consumer.connect(); |
|
|
|
await this.producer.connect(); |
|
|
|
this.sendLoopWithLinger(); |
|
|
|
await this.consumer.subscribe({topic: requestTopic}); |
|
|
|
|
|
|
|
await this.consumer.run({ |
|
|
|
partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, |
|
|
|
eachMessage: async ({topic, partition, message}) => { |
|
|
|
let headers = message.headers; |
|
|
|
let key = message.key || new Buffer([]); |
|
|
|
let msg = { |
|
|
|
key: key.toString('utf8'), |
|
|
|
data: message.value, |
|
|
|
headers: { |
|
|
|
data: headers |
|
|
|
} |
|
|
|
}; |
|
|
|
messageProcessor.onJsInvokeMessage(msg); |
|
|
|
}, |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|
|
|
this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); |
|
|
|
@ -232,15 +227,7 @@ export class KafkaTemplate implements IQueue { |
|
|
|
}, this.linger); |
|
|
|
} |
|
|
|
|
|
|
|
static async build(): Promise<KafkaTemplate> { |
|
|
|
const queue = new KafkaTemplate(); |
|
|
|
await queue.init(); |
|
|
|
return queue; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async destroy(status: number): Promise<void> { |
|
|
|
this.logger.info('Exiting with status: %d ...', status); |
|
|
|
async destroy(): Promise<void> { |
|
|
|
this.logger.info('Stopping Kafka resources...'); |
|
|
|
|
|
|
|
if (this.kafkaAdmin) { |
|
|
|
@ -267,7 +254,6 @@ export class KafkaTemplate implements IQueue { |
|
|
|
} |
|
|
|
} |
|
|
|
this.logger.info('Kafka resources stopped.'); |
|
|
|
process.exit(status); |
|
|
|
} |
|
|
|
|
|
|
|
private async disconnectProducer(): Promise<void> { |
|
|
|
@ -287,4 +273,5 @@ export class KafkaTemplate implements IQueue { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|