From bbc43c2572bb9aa44dc939969fc9486567dc7dbc Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Thu, 7 Jul 2022 17:15:59 +0300 Subject: [PATCH 1/3] Graceful shutdown JavaScript Executor Microservice and minor change in logs --- msa/js-executor/api/httpServer.ts | 31 ++++++++++--- msa/js-executor/queue/awsSqsTemplate.ts | 22 +++++---- msa/js-executor/queue/kafkaTemplate.ts | 27 ++++++----- msa/js-executor/queue/pubSubTemplate.ts | 19 ++++---- msa/js-executor/queue/queue.models.ts | 2 +- msa/js-executor/queue/rabbitmqTemplate.ts | 29 ++++++------ msa/js-executor/queue/serviceBusTemplate.ts | 33 +++++++++----- msa/js-executor/server.ts | 50 ++++++++++++--------- 8 files changed, 122 insertions(+), 91 deletions(-) diff --git a/msa/js-executor/api/httpServer.ts b/msa/js-executor/api/httpServer.ts index f3671a1b89..e1c294fdff 100644 --- a/msa/js-executor/api/httpServer.ts +++ b/msa/js-executor/api/httpServer.ts @@ -16,12 +16,15 @@ import express from 'express'; import { _logger} from '../config/logger'; +import http from 'http'; +import { Socket } from 'net'; export class HttpServer { private logger = _logger('httpServer'); private app = express(); - private server; + private server: http.Server | null; + private connections: Socket[] = []; constructor(httpPort: number) { this.app.get('/livenessProbe', async (req, res) => { @@ -32,15 +35,31 @@ export class HttpServer { }) this.server = this.app.listen(httpPort, () => { - this.logger.info('Started http endpoint on port %s. Please, use /livenessProbe !', httpPort); + this.logger.info('Started HTTP endpoint on port %s. Please, use /livenessProbe !', httpPort); }).on('error', (error) => { this.logger.error(error); }); - } - stop() { - this.server.close(() => { - this.logger.info('Http server stop'); + this.server.on('connection', connection => { + this.connections.push(connection); + connection.on('close', () => this.connections = this.connections.filter(curr => curr !== connection)); }); } + + async stop() { + if (this.server) { + this.logger.info('Stopping HTTP Server...'); + const _server = this.server; + this.server = null; + this.connections.forEach(curr => curr.end(() => curr.destroy())); + await new Promise( + (resolve, reject) => { + _server.close((err) => { + this.logger.info('HTTP Server stopped.'); + resolve(); + }); + } + ); + } + } } diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 7bbf2b28fc..28d421269b 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -59,8 +59,6 @@ export class AwsSqsTemplate implements IQueue { async init() { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - this.sqsClient = new SQSClient({ apiVersion: '2012-11-05', credentials: { @@ -129,7 +127,7 @@ export class AwsSqsTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -193,23 +191,23 @@ export class AwsSqsTemplate implements IQueue { return queue; } - async exit(status: number) { + async destroy(status: number): Promise { this.stopped = true; this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping AWS SQS resources...'); if (this.sqsClient) { - this.logger.info('Stopping Aws Sqs client.') + this.logger.info('Stopping AWS SQS client...'); try { - this.sqsClient.destroy(); + const _sqsClient = this.sqsClient; // @ts-ignore delete this.sqsClient; - this.logger.info('Aws Sqs client stopped.') - process.exit(status); + _sqsClient.destroy(); + this.logger.info('AWS SQS client stopped.'); } catch (e: any) { - this.logger.info('Aws Sqs client stop error.'); - process.exit(status); + this.logger.info('AWS SQS client stop error.'); } - } else { - process.exit(status); } + this.logger.info('AWS SQS resources stopped.') + process.exit(status); } } diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 8c7e7736c8..2b3e947b5e 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -56,8 +56,6 @@ export class KafkaTemplate implements IQueue { async init(): Promise { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); const requestTopic: string = config.get('request_topic'); const useConfluent = config.get('kafka.use_confluent_cloud'); @@ -119,11 +117,11 @@ export class KafkaTemplate implements IQueue { const {CRASH} = this.consumer.events; - this.consumer.on(CRASH, e => { + this.consumer.on(CRASH, async (e) => { this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); if (!e.payload.restart) { this.logger.error('Going to exit due to not retryable error!'); - this.exit(-1); + await this.destroy(-1); } }); @@ -133,7 +131,6 @@ export class KafkaTemplate implements IQueue { this.sendLoopWithLinger(); await this.consumer.subscribe({topic: requestTopic}); - this.logger.info('Started ThingsBoard JavaScript Executor Microservice.'); await this.consumer.run({ partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, eachMessage: async ({topic, partition, message}) => { @@ -153,7 +150,7 @@ export class KafkaTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -242,34 +239,35 @@ export class KafkaTemplate implements IQueue { } - async exit(status: number): Promise { + async destroy(status: number): Promise { this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping Kafka resources...'); if (this.kafkaAdmin) { this.logger.info('Stopping Kafka Admin...'); - await this.kafkaAdmin.disconnect(); + const _kafkaAdmin = this.kafkaAdmin; // @ts-ignore delete this.kafkaAdmin; + await _kafkaAdmin.disconnect(); this.logger.info('Kafka Admin stopped.'); } if (this.consumer) { this.logger.info('Stopping Kafka Consumer...'); try { - await this.consumer.disconnect(); + const _consumer = this.consumer; // @ts-ignore delete this.consumer; + await _consumer.disconnect(); this.logger.info('Kafka Consumer stopped.'); await this.disconnectProducer(); - process.exit(status); } catch (e: any) { this.logger.info('Kafka Consumer stop error.'); await this.disconnectProducer(); - process.exit(status); } - } else { - process.exit(status); } + this.logger.info('Kafka resources stopped.'); + process.exit(status); } private async disconnectProducer(): Promise { @@ -279,9 +277,10 @@ export class KafkaTemplate implements IQueue { this.logger.info('Stopping loop...'); clearTimeout(this.sendLoopInstance); await this.sendMessagesAsBatch(); - await this.producer.disconnect(); + const _producer = this.producer; // @ts-ignore delete this.producer; + await _producer.disconnect(); this.logger.info('Kafka Producer stopped.'); } catch (e) { this.logger.info('Kafka Producer stop error.'); diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index f14aaf5771..4e8990a105 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -39,7 +39,6 @@ export class PubSubTemplate implements IQueue { async init() { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); this.pubSubClient = new PubSub({ projectId: this.projectId, credentials: this.credentials @@ -82,7 +81,7 @@ export class PubSubTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -153,23 +152,23 @@ export class PubSubTemplate implements IQueue { return queue; } - async exit(status: number): Promise { + async destroy(status: number): Promise { this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping Pub/Sub resources...'); if (this.pubSubClient) { - this.logger.info('Stopping Pub/Sub client.') + this.logger.info('Stopping Pub/Sub client...'); try { - await this.pubSubClient.close(); + const _pubSubClient = this.pubSubClient; // @ts-ignore delete this.pubSubClient; - this.logger.info('Pub/Sub client stopped.') - process.exit(status); + await _pubSubClient.close(); + this.logger.info('Pub/Sub client stopped.'); } catch (e) { this.logger.info('Pub/Sub client stop error.'); - process.exit(status); } - } else { - process.exit(status); } + this.logger.info('Pub/Sub resources stopped.'); + process.exit(status); } } diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index 18ce1f06b6..59ec68896d 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -17,5 +17,5 @@ export interface IQueue { init(): Promise; send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; - exit(status: number): Promise; + destroy(status: number): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index d18ba80be7..372024a4f3 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -49,8 +49,6 @@ export class RabbitMqTemplate implements IQueue { async init(): Promise { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; this.connection = await amqp.connect(url); this.channel = await this.connection.createConfirmChannel(); @@ -78,7 +76,7 @@ export class RabbitMqTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -120,32 +118,33 @@ export class RabbitMqTemplate implements IQueue { return queue; } - async exit(status: number) { + async destroy(status: number) { this.logger.info('Exiting with status: %d ...', status); + this.logger.info('Stopping RabbitMQ resources...'); if (this.channel) { - this.logger.info('Stopping RabbitMq chanel.') - await this.channel.close(); + this.logger.info('Stopping RabbitMQ chanel...'); + const _channel = this.channel; // @ts-ignore delete this.channel; - this.logger.info('RabbitMq chanel stopped'); + await _channel.close(); + this.logger.info('RabbitMQ chanel stopped'); } if (this.connection) { - this.logger.info('Stopping RabbitMq connection.') + this.logger.info('Stopping RabbitMQ connection...') try { - await this.connection.close(); + const _connection = this.connection; // @ts-ignore delete this.connection; - this.logger.info('RabbitMq client connection.') - process.exit(status); + await _connection.close(); + this.logger.info('RabbitMQ client connection.'); } catch (e) { - this.logger.info('RabbitMq connection stop error.'); - process.exit(status); + this.logger.info('RabbitMQ connection stop error.'); } - } else { - process.exit(status); } + this.logger.info('RabbitMQ resources stopped.') + process.exit(status); } } diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index 3cfaf3e10c..b2750672e5 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -49,8 +49,6 @@ export class ServiceBusTemplate implements IQueue { async init() { try { - this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; this.sbClient = new ServiceBusClient(connectionString) this.serviceBusService = new ServiceBusAdministrationClient(connectionString); @@ -84,7 +82,7 @@ export class ServiceBusTemplate implements IQueue { } catch (e: any) { this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); this.logger.error(e.stack); - await this.exit(-1); + await this.destroy(-1); } } @@ -141,32 +139,45 @@ export class ServiceBusTemplate implements IQueue { return queue; } - async exit(status: number) { + async destroy(status: number) { this.logger.info('Exiting with status: %d ...', status); this.logger.info('Stopping Azure Service Bus resources...') if (this.receiver) { + this.logger.info('Stopping Service Bus Receiver...'); try { - await this.receiver.close(); + const _receiver = this.receiver; // @ts-ignore delete this.receiver; + await _receiver.close(); + this.logger.info('Service Bus Receiver stopped.'); } catch (e) { + this.logger.info('Service Bus Receiver stop error.'); } } - this.senderMap.forEach(k => { - try { - k.close(); - } catch (e) { - } + this.logger.info('Stopping Service Bus Senders...'); + const senders: Promise[] = []; + this.senderMap.forEach((sender) => { + senders.push(sender.close()); }); this.senderMap.clear(); + try { + await Promise.all(senders); + this.logger.info('Service Bus Senders stopped.'); + } catch (e) { + this.logger.info('Service Bus Senders stop error.'); + } if (this.sbClient) { + this.logger.info('Stopping Service Bus Client...'); try { - await this.sbClient.close(); + const _sbClient = this.sbClient; // @ts-ignore delete this.sbClient; + await _sbClient.close(); + this.logger.info('Service Bus Client stopped.'); } catch (e) { + this.logger.info('Service Bus Client stop error.'); } } this.logger.info('Azure Service Bus resources stopped.') diff --git a/msa/js-executor/server.ts b/msa/js-executor/server.ts index 3d59d7fe00..708a87fec9 100644 --- a/msa/js-executor/server.ts +++ b/msa/js-executor/server.ts @@ -32,33 +32,34 @@ logger.info('===CONFIG END==='); const serviceType = config.get('queue_type'); const httpPort = Number(config.get('http_port')); -let queues: IQueue; -let httpServer: HttpServer; +let queues: IQueue | null; +let httpServer: HttpServer | null; (async () => { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); switch (serviceType) { case 'kafka': - logger.info('Starting kafka template.'); + logger.info('Starting Kafka template...'); queues = await KafkaTemplate.build(); - logger.info('kafka template started.'); + logger.info('Kafka template started.'); break; case 'pubsub': - logger.info('Starting Pub/Sub template.') + logger.info('Starting Pub/Sub template...') queues = await PubSubTemplate.build(); logger.info('Pub/Sub template started.') break; case 'aws-sqs': - logger.info('Starting Aws Sqs template.') + logger.info('Starting AWS SQS template...') queues = await AwsSqsTemplate.build(); - logger.info('Aws Sqs template started.') + logger.info('AWS SQS template started.') break; case 'rabbitmq': - logger.info('Starting RabbitMq template.') + logger.info('Starting RabbitMQ template...') queues = await RabbitMqTemplate.build(); - logger.info('RabbitMq template started.') + logger.info('RabbitMQ template started.') break; case 'service-bus': - logger.info('Starting Azure Service Bus template.') + logger.info('Starting Azure Service Bus template...') queues = await ServiceBusTemplate.build(); logger.info('Azure Service Bus template started.') break; @@ -70,17 +71,22 @@ let httpServer: HttpServer; httpServer = new HttpServer(httpPort); })(); -process.on('SIGTERM', () => { - logger.info('SIGTERM signal received'); - process.exit(0); -}); +[`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => { + process.on(eventType, async () => { + logger.info(`${eventType} signal received`); + if (httpServer) { + const _httpServer = httpServer; + httpServer = null; + await _httpServer.stop(); + } + if (queues) { + const _queues = queues; + queues = null; + await _queues.destroy(0); + } + }) +}) -process.on('exit', async () => { - if (httpServer) { - httpServer.stop(); - } - if (queues) { - queues.exit(0); - } - logger.info('JavaScript Executor Microservice has been stopped.'); +process.on('exit', (code: number) => { + logger.info(`JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); }); From d3541573b2df8d3eb8237161cfedaa3d450dac36 Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Thu, 7 Jul 2022 17:44:23 +0300 Subject: [PATCH 2/3] Fixed Docker file for JavaScript Executor Microservice --- msa/js-executor/docker/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/msa/js-executor/docker/Dockerfile b/msa/js-executor/docker/Dockerfile index 620d712ad1..d17138d923 100644 --- a/msa/js-executor/docker/Dockerfile +++ b/msa/js-executor/docker/Dockerfile @@ -29,6 +29,7 @@ COPY package/linux/conf ./conf COPY package/linux/conf ./config COPY src/api ./api COPY src/queue ./queue +COPY src/config ./config COPY src/server.js ./ RUN chmod a+x /tmp/*.sh \ From 7019b98c00a9c0a045b446cd571e958762d5bef7 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 7 Jul 2022 19:37:36 +0300 Subject: [PATCH 3/3] JS executor code refactor --- msa/js-executor/package.json | 2 +- msa/js-executor/queue/awsSqsTemplate.ts | 128 +++++++-------- msa/js-executor/queue/kafkaTemplate.ts | 173 +++++++++----------- msa/js-executor/queue/pubSubTemplate.ts | 75 ++++----- msa/js-executor/queue/queue.models.ts | 3 +- msa/js-executor/queue/rabbitmqTemplate.ts | 50 +++--- msa/js-executor/queue/serviceBusTemplate.ts | 70 ++++---- msa/js-executor/server.ts | 77 ++++----- msa/js-executor/yarn.lock | 8 +- 9 files changed, 264 insertions(+), 322 deletions(-) diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 1d1059aa90..6bc7668e15 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -20,7 +20,7 @@ "config": "^3.3.7", "express": "^4.18.1", "js-yaml": "^4.1.0", - "kafkajs": "^2.0.2", + "kafkajs": "^2.1.0", "long": "^5.2.0", "uuid-parse": "^1.1.0", "uuid-random": "^1.3.2", diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 28d421269b..36ec9b5270 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -54,80 +54,76 @@ export class AwsSqsTemplate implements IQueue { FifoQueue: 'true' }; + name = 'AWS SQS'; + constructor() { } async init() { - try { - this.sqsClient = new SQSClient({ - apiVersion: '2012-11-05', - credentials: { - accessKeyId: this.accessKeyId, - secretAccessKey: this.secretAccessKey - }, - region: this.region - }); + this.sqsClient = new SQSClient({ + apiVersion: '2012-11-05', + credentials: { + accessKeyId: this.accessKeyId, + secretAccessKey: this.secretAccessKey + }, + region: this.region + }); - const queues = await this.getQueues(); + const queues = await this.getQueues(); - if (queues.QueueUrls) { - queues.QueueUrls.forEach(queueUrl => { - const delimiterPosition = queueUrl.lastIndexOf('/'); - const queueName = queueUrl.substring(delimiterPosition + 1); - this.queueUrls.set(queueName, queueUrl); - }); - } + if (queues.QueueUrls) { + queues.QueueUrls.forEach(queueUrl => { + const delimiterPosition = queueUrl.lastIndexOf('/'); + const queueName = queueUrl.substring(delimiterPosition + 1); + this.queueUrls.set(queueName, queueUrl); + }); + } - this.parseQueueProperties(); + this.parseQueueProperties(); - this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; - if (!this.requestQueueURL) { - this.requestQueueURL = await this.createQueue(this.requestTopic); - } + this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; + if (!this.requestQueueURL) { + this.requestQueueURL = await this.createQueue(this.requestTopic); + } + + const messageProcessor = new JsInvokeMessageProcessor(this); - const messageProcessor = new JsInvokeMessageProcessor(this); - - const params: ReceiveMessageRequest = { - MaxNumberOfMessages: 10, - QueueUrl: this.requestQueueURL, - WaitTimeSeconds: this.pollInterval / 1000 - }; - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); - const messages = messagesResponse.Messages; - - if (messages && messages.length > 0) { - const entries: DeleteMessageBatchRequestEntry[] = []; - - messages.forEach(message => { - entries.push({ - Id: message.MessageId, - ReceiptHandle: message.ReceiptHandle - }); - messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + const params: ReceiveMessageRequest = { + MaxNumberOfMessages: 10, + QueueUrl: this.requestQueueURL, + WaitTimeSeconds: this.pollInterval / 1000 + }; + while (!this.stopped) { + let pollStartTs = new Date().getTime(); + const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); + const messages = messagesResponse.Messages; + + if (messages && messages.length > 0) { + const entries: DeleteMessageBatchRequestEntry[] = []; + + messages.forEach(message => { + entries.push({ + Id: message.MessageId, + ReceiptHandle: message.ReceiptHandle }); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + }); - const deleteBatch: DeleteMessageBatchRequest = { - QueueUrl: this.requestQueueURL, - Entries: entries - }; - try { - await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) - } catch (err: any) { - this.logger.error("Failed to delete messages from queue.", err.message); - } - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + const deleteBatch: DeleteMessageBatchRequest = { + QueueUrl: this.requestQueueURL, + Entries: entries + }; + try { + await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) + } catch (err: any) { + this.logger.error("Failed to delete messages from queue.", err.message); + } + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < this.pollInterval) { + await sleep(this.pollInterval - pollDuration); } } - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); } } @@ -185,15 +181,8 @@ export class AwsSqsTemplate implements IQueue { return result.QueueUrl || ''; } - static async build(): Promise { - const queue = new AwsSqsTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number): Promise { + async destroy(): Promise { this.stopped = true; - this.logger.info('Exiting with status: %d ...', status); this.logger.info('Stopping AWS SQS resources...'); if (this.sqsClient) { this.logger.info('Stopping AWS SQS client...'); @@ -208,6 +197,5 @@ export class AwsSqsTemplate implements IQueue { } } this.logger.info('AWS SQS resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/queue/kafkaTemplate.ts b/msa/js-executor/queue/kafkaTemplate.ts index 2b3e947b5e..51fa6e291b 100644 --- a/msa/js-executor/queue/kafkaTemplate.ts +++ b/msa/js-executor/queue/kafkaTemplate.ts @@ -51,108 +51,103 @@ export class KafkaTemplate implements IQueue { private batchMessages: TopicMessages[] = []; private sendLoopInstance: NodeJS.Timeout; + name = 'Kafka'; + constructor() { } async init(): Promise { - try { - const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); - const requestTopic: string = config.get('request_topic'); - const useConfluent = config.get('kafka.use_confluent_cloud'); - - this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - this.logger.info('Kafka Requests Topic: %s', requestTopic); - - let kafkaConfig: KafkaConfig = { - brokers: kafkaBootstrapServers.split(','), - logLevel: logLevel.INFO, - logCreator: KafkaJsWinstonLogCreator - }; + const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); + const requestTopic: string = config.get('request_topic'); + const useConfluent = config.get('kafka.use_confluent_cloud'); - if (this.kafkaClientId) { - kafkaConfig['clientId'] = this.kafkaClientId; - } else { - this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); - } + this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + this.logger.info('Kafka Requests Topic: %s', requestTopic); - kafkaConfig['requestTimeout'] = this.requestTimeout; + let kafkaConfig: KafkaConfig = { + brokers: kafkaBootstrapServers.split(','), + logLevel: logLevel.INFO, + logCreator: KafkaJsWinstonLogCreator + }; - if (useConfluent) { - kafkaConfig['sasl'] = { - mechanism: config.get('kafka.confluent.sasl.mechanism') as any, - username: config.get('kafka.confluent.username'), - password: config.get('kafka.confluent.password') - }; - kafkaConfig['ssl'] = true; - } + if (this.kafkaClientId) { + kafkaConfig['clientId'] = this.kafkaClientId; + } else { + this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); + } - this.parseTopicProperties(); + kafkaConfig['requestTimeout'] = this.requestTimeout; - this.kafkaClient = new Kafka(kafkaConfig); - this.kafkaAdmin = this.kafkaClient.admin(); - await this.kafkaAdmin.connect(); + if (useConfluent) { + kafkaConfig['sasl'] = { + mechanism: config.get('kafka.confluent.sasl.mechanism') as any, + username: config.get('kafka.confluent.username'), + password: config.get('kafka.confluent.password') + }; + kafkaConfig['ssl'] = true; + } - let partitions = 1; + this.parseTopicProperties(); - for (let i = 0; i < this.configEntries.length; i++) { - let param = this.configEntries[i]; - if (param.name === 'partitions') { - partitions = param.value; - this.configEntries.splice(i, 1); - break; - } - } + this.kafkaClient = new Kafka(kafkaConfig); + this.kafkaAdmin = this.kafkaClient.admin(); + await this.kafkaAdmin.connect(); - let topics = await this.kafkaAdmin.listTopics(); + let partitions = 1; - if (!topics.includes(requestTopic)) { - let createRequestTopicResult = await this.createTopic(requestTopic, partitions); - if (createRequestTopicResult) { - this.logger.info('Created new topic: %s', requestTopic); - } + for (let i = 0; i < this.configEntries.length; i++) { + let param = this.configEntries[i]; + if (param.name === 'partitions') { + partitions = param.value; + this.configEntries.splice(i, 1); + break; } + } - this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); - this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); + let topics = await this.kafkaAdmin.listTopics(); - const {CRASH} = this.consumer.events; + if (!topics.includes(requestTopic)) { + let createRequestTopicResult = await this.createTopic(requestTopic, partitions); + if (createRequestTopicResult) { + this.logger.info('Created new topic: %s', requestTopic); + } + } - this.consumer.on(CRASH, async (e) => { - this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); - if (!e.payload.restart) { - this.logger.error('Going to exit due to not retryable error!'); - await this.destroy(-1); - } - }); + this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); + this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); - const messageProcessor = new JsInvokeMessageProcessor(this); - await this.consumer.connect(); - await this.producer.connect(); - this.sendLoopWithLinger(); - await this.consumer.subscribe({topic: requestTopic}); - - await this.consumer.run({ - partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, - eachMessage: async ({topic, partition, message}) => { - let headers = message.headers; - let key = message.key || new Buffer([]); - let msg = { - key: key.toString('utf8'), - data: message.value, - headers: { - data: headers - } - }; - messageProcessor.onJsInvokeMessage(msg); - }, - }); + const {CRASH} = this.consumer.events; - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); - } - } + this.consumer.on(CRASH, async (e) => { + this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); + if (!e.payload.restart) { + this.logger.error('Going to exit due to not retryable error!'); + await this.destroy(); + } + }); + + const messageProcessor = new JsInvokeMessageProcessor(this); + await this.consumer.connect(); + await this.producer.connect(); + this.sendLoopWithLinger(); + await this.consumer.subscribe({topic: requestTopic}); + + await this.consumer.run({ + partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, + eachMessage: async ({topic, partition, message}) => { + let headers = message.headers; + let key = message.key || new Buffer([]); + let msg = { + key: key.toString('utf8'), + data: message.value, + headers: { + data: headers + } + }; + messageProcessor.onJsInvokeMessage(msg); + }, + }); +} async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); @@ -232,15 +227,7 @@ export class KafkaTemplate implements IQueue { }, this.linger); } - static async build(): Promise { - const queue = new KafkaTemplate(); - await queue.init(); - return queue; - } - - - async destroy(status: number): Promise { - this.logger.info('Exiting with status: %d ...', status); + async destroy(): Promise { this.logger.info('Stopping Kafka resources...'); if (this.kafkaAdmin) { @@ -267,7 +254,6 @@ export class KafkaTemplate implements IQueue { } } this.logger.info('Kafka resources stopped.'); - process.exit(status); } private async disconnectProducer(): Promise { @@ -287,4 +273,5 @@ export class KafkaTemplate implements IQueue { } } } + } diff --git a/msa/js-executor/queue/pubSubTemplate.ts b/msa/js-executor/queue/pubSubTemplate.ts index 4e8990a105..eff35017ba 100644 --- a/msa/js-executor/queue/pubSubTemplate.ts +++ b/msa/js-executor/queue/pubSubTemplate.ts @@ -34,55 +34,50 @@ export class PubSubTemplate implements IQueue { private topics: string[] = []; private subscriptions: string[] = []; + name = 'Pub/Sub'; + constructor() { } async init() { - try { - this.pubSubClient = new PubSub({ - projectId: this.projectId, - credentials: this.credentials - }); - - this.parseQueueProperties(); + this.pubSubClient = new PubSub({ + projectId: this.projectId, + credentials: this.credentials + }); - const topicList = await this.pubSubClient.getTopics(); + this.parseQueueProperties(); - if (topicList) { - topicList[0].forEach(topic => { - this.topics.push(PubSubTemplate.getName(topic.name)); - }); - } + const topicList = await this.pubSubClient.getTopics(); - const subscriptionList = await this.pubSubClient.getSubscriptions(); + if (topicList) { + topicList[0].forEach(topic => { + this.topics.push(PubSubTemplate.getName(topic.name)); + }); + } - if (subscriptionList) { - topicList[0].forEach(sub => { - this.subscriptions.push(PubSubTemplate.getName(sub.name)); - }); - } + const subscriptionList = await this.pubSubClient.getSubscriptions(); - if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { - await this.createTopic(this.requestTopic); - await this.createSubscription(this.requestTopic); - } + if (subscriptionList) { + topicList[0].forEach(sub => { + this.subscriptions.push(PubSubTemplate.getName(sub.name)); + }); + } - const subscription = this.pubSubClient.subscription(this.requestTopic); + if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { + await this.createTopic(this.requestTopic); + await this.createSubscription(this.requestTopic); + } - const messageProcessor = new JsInvokeMessageProcessor(this); + const subscription = this.pubSubClient.subscription(this.requestTopic); - const messageHandler = (message: Message) => { - messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); - message.ack(); - }; + const messageProcessor = new JsInvokeMessageProcessor(this); - subscription.on('message', messageHandler); + const messageHandler = (message: Message) => { + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); + message.ack(); + }; - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); - } + subscription.on('message', messageHandler); } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -146,14 +141,7 @@ export class PubSubTemplate implements IQueue { } } - static async build(): Promise { - const queue = new PubSubTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number): Promise { - this.logger.info('Exiting with status: %d ...', status); + async destroy(): Promise { this.logger.info('Stopping Pub/Sub resources...'); if (this.pubSubClient) { this.logger.info('Stopping Pub/Sub client...'); @@ -168,7 +156,6 @@ export class PubSubTemplate implements IQueue { } } this.logger.info('Pub/Sub resources stopped.'); - process.exit(status); } } diff --git a/msa/js-executor/queue/queue.models.ts b/msa/js-executor/queue/queue.models.ts index 59ec68896d..a86dc8fd1d 100644 --- a/msa/js-executor/queue/queue.models.ts +++ b/msa/js-executor/queue/queue.models.ts @@ -15,7 +15,8 @@ /// export interface IQueue { + name: string; init(): Promise; send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise; - destroy(status: number): Promise; + destroy(): Promise; } diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index 372024a4f3..9369f170f5 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -44,39 +44,35 @@ export class RabbitMqTemplate implements IQueue { private stopped = false; private topics: string[] = []; + name = 'RabbitMQ'; + constructor() { } async init(): Promise { - try { - const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; - this.connection = await amqp.connect(url); - this.channel = await this.connection.createConfirmChannel(); + const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; + this.connection = await amqp.connect(url); + this.channel = await this.connection.createConfirmChannel(); - this.parseQueueProperties(); + this.parseQueueProperties(); - await this.createQueue(this.requestTopic); + await this.createQueue(this.requestTopic); - const messageProcessor = new JsInvokeMessageProcessor(this); + const messageProcessor = new JsInvokeMessageProcessor(this); - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - let message = await this.channel.get(this.requestTopic); + while (!this.stopped) { + let pollStartTs = new Date().getTime(); + let message = await this.channel.get(this.requestTopic); - if (message) { - messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); - this.channel.ack(message); - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + if (message) { + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); + this.channel.ack(message); + } else { + let pollDuration = new Date().getTime() - pollStartTs; + if (pollDuration < this.pollInterval) { + await sleep(this.pollInterval - pollDuration); } } - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); } } @@ -112,14 +108,7 @@ export class RabbitMqTemplate implements IQueue { return this.channel.assertQueue(topic, this.queueOptions); } - static async build(): Promise { - const queue = new RabbitMqTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number) { - this.logger.info('Exiting with status: %d ...', status); + async destroy() { this.logger.info('Stopping RabbitMQ resources...'); if (this.channel) { @@ -144,7 +133,6 @@ export class RabbitMqTemplate implements IQueue { } } this.logger.info('RabbitMQ resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/queue/serviceBusTemplate.ts b/msa/js-executor/queue/serviceBusTemplate.ts index b2750672e5..76d87e8068 100644 --- a/msa/js-executor/queue/serviceBusTemplate.ts +++ b/msa/js-executor/queue/serviceBusTemplate.ts @@ -44,46 +44,42 @@ export class ServiceBusTemplate implements IQueue { private receiver: ServiceBusReceiver; private senderMap = new Map(); + name = 'Azure Service Bus'; + constructor() { } async init() { - try { - const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; - this.sbClient = new ServiceBusClient(connectionString) - this.serviceBusService = new ServiceBusAdministrationClient(connectionString); + const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; + this.sbClient = new ServiceBusClient(connectionString) + this.serviceBusService = new ServiceBusAdministrationClient(connectionString); - this.parseQueueProperties(); + this.parseQueueProperties(); - const listQueues = await this.serviceBusService.listQueues(); - for await (const queue of listQueues) { - this.queues.push(queue.name); - } - - if (!this.queues.includes(this.requestTopic)) { - await this.createQueueIfNotExist(this.requestTopic); - this.queues.push(this.requestTopic); - } + const listQueues = await this.serviceBusService.listQueues(); + for await (const queue of listQueues) { + this.queues.push(queue.name); + } - this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); - - const messageProcessor = new JsInvokeMessageProcessor(this); - - const messageHandler = async (message: ServiceBusReceivedMessage) => { - if (message) { - messageProcessor.onJsInvokeMessage(message.body); - await this.receiver.completeMessage(message); - } - }; - const errorHandler = async (error: ProcessErrorArgs) => { - this.logger.error('Failed to receive message from queue.', error); - }; - this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) - } catch (e: any) { - this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - this.logger.error(e.stack); - await this.destroy(-1); + if (!this.queues.includes(this.requestTopic)) { + await this.createQueueIfNotExist(this.requestTopic); + this.queues.push(this.requestTopic); } + + this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); + + const messageProcessor = new JsInvokeMessageProcessor(this); + + const messageHandler = async (message: ServiceBusReceivedMessage) => { + if (message) { + messageProcessor.onJsInvokeMessage(message.body); + await this.receiver.completeMessage(message); + } + }; + const errorHandler = async (error: ProcessErrorArgs) => { + this.logger.error('Failed to receive message from queue.', error); + }; + this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -133,14 +129,7 @@ export class ServiceBusTemplate implements IQueue { } } - static async build(): Promise { - const queue = new ServiceBusTemplate(); - await queue.init(); - return queue; - } - - async destroy(status: number) { - this.logger.info('Exiting with status: %d ...', status); + async destroy() { this.logger.info('Stopping Azure Service Bus resources...') if (this.receiver) { this.logger.info('Stopping Service Bus Receiver...'); @@ -181,6 +170,5 @@ export class ServiceBusTemplate implements IQueue { } } this.logger.info('Azure Service Bus resources stopped.') - process.exit(status); } } diff --git a/msa/js-executor/server.ts b/msa/js-executor/server.ts index 708a87fec9..446f9d34aa 100644 --- a/msa/js-executor/server.ts +++ b/msa/js-executor/server.ts @@ -30,63 +30,66 @@ logger.info('===CONFIG BEGIN==='); logger.info(JSON.stringify(config, null, 4)); logger.info('===CONFIG END==='); -const serviceType = config.get('queue_type'); +const serviceType: string = config.get('queue_type'); const httpPort = Number(config.get('http_port')); let queues: IQueue | null; let httpServer: HttpServer | null; (async () => { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + try { + queues = await createQueue(serviceType); + logger.info(`Starting ${queues.name} template...`); + await queues.init(); + logger.info(`${queues.name} template started.`); + httpServer = new HttpServer(httpPort); + } catch (e: any) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + await exit(-1); + } + +})(); + +async function createQueue(serviceType: string): Promise { switch (serviceType) { case 'kafka': - logger.info('Starting Kafka template...'); - queues = await KafkaTemplate.build(); - logger.info('Kafka template started.'); - break; + return new KafkaTemplate(); case 'pubsub': - logger.info('Starting Pub/Sub template...') - queues = await PubSubTemplate.build(); - logger.info('Pub/Sub template started.') - break; + return new PubSubTemplate(); case 'aws-sqs': - logger.info('Starting AWS SQS template...') - queues = await AwsSqsTemplate.build(); - logger.info('AWS SQS template started.') - break; + return new AwsSqsTemplate(); case 'rabbitmq': - logger.info('Starting RabbitMQ template...') - queues = await RabbitMqTemplate.build(); - logger.info('RabbitMQ template started.') - break; + return new RabbitMqTemplate(); case 'service-bus': - logger.info('Starting Azure Service Bus template...') - queues = await ServiceBusTemplate.build(); - logger.info('Azure Service Bus template started.') - break; + return new ServiceBusTemplate(); default: - logger.error('Unknown service type: ', serviceType); - process.exit(-1); + throw new Error('Unknown service type: ' + serviceType); } - - httpServer = new HttpServer(httpPort); -})(); +} [`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => { process.on(eventType, async () => { logger.info(`${eventType} signal received`); - if (httpServer) { - const _httpServer = httpServer; - httpServer = null; - await _httpServer.stop(); - } - if (queues) { - const _queues = queues; - queues = null; - await _queues.destroy(0); - } + await exit(0); }) }) process.on('exit', (code: number) => { - logger.info(`JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); + logger.info(`ThingsBoard JavaScript Executor Microservice has been stopped. Exit code: ${code}.`); }); + +async function exit(status: number) { + logger.info('Exiting with status: %d ...', status); + if (httpServer) { + const _httpServer = httpServer; + httpServer = null; + await _httpServer.stop(); + } + if (queues) { + const _queues = queues; + queues = null; + await _queues.destroy(); + } + process.exit(status); +} diff --git a/msa/js-executor/yarn.lock b/msa/js-executor/yarn.lock index ed6e7406f9..eb6443e53d 100644 --- a/msa/js-executor/yarn.lock +++ b/msa/js-executor/yarn.lock @@ -2670,10 +2670,10 @@ jws@^4.0.0: jwa "^2.0.0" safe-buffer "^5.0.1" -kafkajs@^2.0.2: - version "2.0.2" - resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.0.2.tgz#cdfc8f57aa4fd69f6d9ca1cce4ee89bbc2a3a1f9" - integrity sha512-g6CM3fAenofOjR1bfOAqeZUEaSGhNtBscNokybSdW1rmIKYNwBPC9xQzwulFJm36u/xcxXUiCl/L/qfslapihA== +kafkajs@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-2.1.0.tgz#32ede4e8080cc75586c5e4406eeb582fa73f7b1e" + integrity sha512-6IYiOdGWvFPbSbVB+AV3feT+A7vzw5sXm7Ze4QTfP7FRNdY8pGcpiNPvD2lfgYFD8Dm9KbMgBgTt2mf8KaIkzw== keyv@^3.0.0: version "3.1.0"