24 changed files with 1375 additions and 150 deletions
@ -0,0 +1,199 @@ |
|||
/* |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('awsSqsTemplate'); |
|||
|
|||
const requestTopic = config.get('request_topic'); |
|||
|
|||
const accessKeyId = config.get('aws_sqs.access_key_id'); |
|||
const secretAccessKey = config.get('aws_sqs.secret_access_key'); |
|||
const region = config.get('aws_sqs.region'); |
|||
const AWS = require('aws-sdk'); |
|||
const queueProperties = config.get('aws_sqs.queue-properties'); |
|||
const poolInterval = config.get('js.response_poll_interval'); |
|||
|
|||
let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; |
|||
let sqsClient; |
|||
let requestQueueURL; |
|||
const queueUrls = new Map(); |
|||
let stopped = false; |
|||
|
|||
function AwsSqsProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
let msgBody = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
|
|||
let responseQueueUrl = queueUrls.get(topicToSqsQueueName(responseTopic)); |
|||
|
|||
if (!responseQueueUrl) { |
|||
responseQueueUrl = await createQueue(responseTopic); |
|||
queueUrls.set(responseTopic, responseQueueUrl); |
|||
} |
|||
|
|||
let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; |
|||
|
|||
return new Promise((resolve, reject) => { |
|||
sqsClient.sendMessage(params, function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
AWS.config.update({accessKeyId: accessKeyId, secretAccessKey: secretAccessKey, region: region}); |
|||
|
|||
sqsClient = new AWS.SQS({apiVersion: '2012-11-05'}); |
|||
|
|||
const queues = await getQueues(); |
|||
|
|||
queues.forEach(queueUrl => { |
|||
const delimiterPosition = queueUrl.lastIndexOf('/'); |
|||
const queueName = queueUrl.substring(delimiterPosition + 1); |
|||
queueUrls.set(queueName, queueUrl); |
|||
}) |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
requestQueueURL = queueUrls.get(topicToSqsQueueName(requestTopic)); |
|||
if (!requestQueueURL) { |
|||
requestQueueURL = await createQueue(requestTopic); |
|||
} |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer()); |
|||
|
|||
const params = { |
|||
MaxNumberOfMessages: 10, |
|||
QueueUrl: requestQueueURL, |
|||
WaitTimeSeconds: poolInterval / 1000 |
|||
}; |
|||
while (!stopped) { |
|||
const messages = await new Promise((resolve, reject) => { |
|||
sqsClient.receiveMessage(params, function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data.Messages); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
if (messages && messages.length > 0) { |
|||
const entries = []; |
|||
|
|||
messages.forEach(message => { |
|||
entries.push({ |
|||
Id: message.MessageId, |
|||
ReceiptHandle: message.ReceiptHandle |
|||
}); |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body)); |
|||
}); |
|||
|
|||
const deleteBatch = { |
|||
QueueUrl: requestQueueURL, |
|||
Entries: entries |
|||
}; |
|||
sqsClient.deleteMessageBatch(deleteBatch, function (err, data) { |
|||
if (err) { |
|||
logger.error("Failed to delete messages from queue.", err.message); |
|||
} else { |
|||
//do nothing
|
|||
} |
|||
}); |
|||
} |
|||
} |
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
function createQueue(topic) { |
|||
let queueName = topicToSqsQueueName(topic); |
|||
let queueParams = {QueueName: queueName, Attributes: queueAttributes}; |
|||
|
|||
return new Promise((resolve, reject) => { |
|||
sqsClient.createQueue(queueParams, function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data.QueueUrl); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function getQueues() { |
|||
return new Promise((resolve, reject) => { |
|||
sqsClient.listQueues(function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data.QueueUrls); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function topicToSqsQueueName(topic) { |
|||
return topic.replace(/\./g, '_') + '.fifo'; |
|||
} |
|||
|
|||
function parseQueueProperties() { |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
queueAttributes[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
stopped = true; |
|||
logger.info('Aws Sqs client stopped.'); |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
if (sqsClient) { |
|||
logger.info('Stopping Aws Sqs client.') |
|||
try { |
|||
await sqsClient.close(); |
|||
logger.info('Aws Sqs client stopped.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('Aws Sqs client stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
@ -0,0 +1,181 @@ |
|||
/* |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
const {logLevel, Kafka} = require('kafkajs'); |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('kafkaTemplate'), |
|||
KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; |
|||
const replicationFactor = config.get('kafka.replication_factor'); |
|||
const topicProperties = config.get('kafka.topic-properties'); |
|||
|
|||
let kafkaClient; |
|||
let kafkaAdmin; |
|||
let consumer; |
|||
let producer; |
|||
|
|||
const topics = []; |
|||
const configEntries = []; |
|||
|
|||
function KafkaProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
|
|||
if (!topics.includes(responseTopic)) { |
|||
let createResponseTopicResult = await createTopic(responseTopic); |
|||
topics.push(responseTopic); |
|||
if (createResponseTopicResult) { |
|||
logger.info('Created new topic: %s', requestTopic); |
|||
} |
|||
} |
|||
|
|||
let headersData = headers.data; |
|||
headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)])); |
|||
return producer.send( |
|||
{ |
|||
topic: responseTopic, |
|||
messages: [ |
|||
{ |
|||
key: scriptId, |
|||
value: rawResponse, |
|||
headers: headersData |
|||
} |
|||
] |
|||
}); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); |
|||
const requestTopic = config.get('request_topic'); |
|||
|
|||
logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
|||
logger.info('Kafka Requests Topic: %s', requestTopic); |
|||
|
|||
kafkaClient = new Kafka({ |
|||
brokers: kafkaBootstrapServers.split(','), |
|||
logLevel: logLevel.INFO, |
|||
logCreator: KafkaJsWinstonLogCreator |
|||
}); |
|||
|
|||
parseTopicProperties(); |
|||
|
|||
kafkaAdmin = kafkaClient.admin(); |
|||
await kafkaAdmin.connect(); |
|||
|
|||
let createRequestTopicResult = await createTopic(requestTopic); |
|||
|
|||
if (createRequestTopicResult) { |
|||
logger.info('Created new topic: %s', requestTopic); |
|||
} |
|||
|
|||
consumer = kafkaClient.consumer({groupId: 'js-executor-group'}); |
|||
producer = kafkaClient.producer(); |
|||
const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); |
|||
await consumer.connect(); |
|||
await producer.connect(); |
|||
await consumer.subscribe({topic: requestTopic}); |
|||
|
|||
logger.info('Started ThingsBoard JavaScript Executor Microservice.'); |
|||
await consumer.run({ |
|||
eachMessage: async ({topic, partition, message}) => { |
|||
let headers = message.headers; |
|||
let key = message.key; |
|||
let data = message.value; |
|||
let msg = {}; |
|||
|
|||
headers = Object.fromEntries( |
|||
Object.entries(headers).map(([key, value]) => [key, [...value]])); |
|||
|
|||
msg.key = key.toString('utf8'); |
|||
msg.data = [...data]; |
|||
msg.headers = {data: headers} |
|||
messageProcessor.onJsInvokeMessage(msg); |
|||
}, |
|||
}); |
|||
|
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
function createTopic(topic) { |
|||
return kafkaAdmin.createTopics({ |
|||
topics: [{ |
|||
topic: topic, |
|||
replicationFactor: replicationFactor, |
|||
configEntries: configEntries |
|||
}] |
|||
}); |
|||
} |
|||
|
|||
function parseTopicProperties() { |
|||
const props = topicProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)}); |
|||
}); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
|
|||
if (kafkaAdmin) { |
|||
logger.info('Stopping Kafka Admin...'); |
|||
await kafkaAdmin.disconnect(); |
|||
logger.info('Kafka Admin stopped.'); |
|||
} |
|||
|
|||
if (consumer) { |
|||
logger.info('Stopping Kafka Consumer...'); |
|||
let _consumer = consumer; |
|||
consumer = null; |
|||
try { |
|||
await _consumer.disconnect(); |
|||
logger.info('Kafka Consumer stopped.'); |
|||
await disconnectProducer(); |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('Kafka Consumer stop error.'); |
|||
await disconnectProducer(); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
|
|||
async function disconnectProducer() { |
|||
if (producer) { |
|||
logger.info('Stopping Kafka Producer...'); |
|||
var _producer = producer; |
|||
producer = null; |
|||
try { |
|||
await _producer.disconnect(); |
|||
logger.info('Kafka Producer stopped.'); |
|||
} catch (e) { |
|||
logger.info('Kafka Producer stop error.'); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,154 @@ |
|||
/* |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('pubSubTemplate'); |
|||
const {PubSub} = require('@google-cloud/pubsub'); |
|||
|
|||
const projectId = config.get('pubsub.project_id'); |
|||
const credentials = JSON.parse(config.get('pubsub.service_account')); |
|||
const requestTopic = config.get('request_topic'); |
|||
const queueProperties = config.get('pubsub.queue-properties'); |
|||
|
|||
let pubSubClient; |
|||
|
|||
const topics = []; |
|||
const subscriptions = []; |
|||
const queueProps = []; |
|||
|
|||
function PubSubProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
|
|||
if (!(subscriptions.includes(responseTopic) && topics.includes(requestTopic))) { |
|||
await createTopic(requestTopic); |
|||
} |
|||
|
|||
let data = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
let dataBuffer = Buffer.from(data); |
|||
return pubSubClient.topic(responseTopic).publish(dataBuffer); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
pubSubClient = new PubSub({projectId: projectId, credentials: credentials}); |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
const topicList = await pubSubClient.getTopics(); |
|||
|
|||
if (topicList) { |
|||
topicList[0].forEach(topic => { |
|||
topics.push(getName(topic.name)); |
|||
}); |
|||
} |
|||
|
|||
const subscriptionList = await pubSubClient.getSubscriptions(); |
|||
|
|||
if (subscriptionList) { |
|||
topicList[0].forEach(sub => { |
|||
subscriptions.push(getName(sub.name)); |
|||
}); |
|||
} |
|||
|
|||
if (!(subscriptions.includes(requestTopic) && topics.includes(requestTopic))) { |
|||
await createTopic(requestTopic); |
|||
} |
|||
|
|||
const subscription = pubSubClient.subscription(requestTopic); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer()); |
|||
|
|||
const messageHandler = message => { |
|||
|
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); |
|||
message.ack(); |
|||
}; |
|||
|
|||
subscription.on('message', messageHandler); |
|||
|
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
async function createTopic(topic) { |
|||
if (!topics.includes(topic)) { |
|||
await pubSubClient.createTopic(topic); |
|||
topics.push(topic); |
|||
logger.info('Created new Pub/Sub topic: %s', topic); |
|||
} |
|||
await createSubscription(topic) |
|||
} |
|||
|
|||
async function createSubscription(topic) { |
|||
if (!subscriptions.includes(topic)) { |
|||
await pubSubClient.createSubscription(topic, topic, { |
|||
topic: topic, |
|||
subscription: topic, |
|||
ackDeadlineSeconds: queueProps['ackDeadlineInSec'], |
|||
messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']} |
|||
}); |
|||
subscriptions.push(topic); |
|||
logger.info('Created new Pub/Sub subscription: %s', topic); |
|||
} |
|||
} |
|||
|
|||
function parseQueueProperties() { |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
function getName(fullName) { |
|||
const delimiterPosition = fullName.lastIndexOf('/'); |
|||
return fullName.substring(delimiterPosition + 1); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
if (pubSubClient) { |
|||
logger.info('Stopping Pub/Sub client.') |
|||
try { |
|||
await pubSubClient.close(); |
|||
logger.info('Pub/Sub client stopped.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('Pub/Sub client stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,177 @@ |
|||
/* |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('rabbitmqTemplate'); |
|||
|
|||
const requestTopic = config.get('request_topic'); |
|||
const host = config.get('rabbitmq.host'); |
|||
const port = config.get('rabbitmq.port'); |
|||
const vhost = config.get('rabbitmq.virtual_host'); |
|||
const username = config.get('rabbitmq.username'); |
|||
const password = config.get('rabbitmq.password'); |
|||
const queueProperties = config.get('rabbitmq.queue-properties'); |
|||
const poolInterval = config.get('js.response_poll_interval'); |
|||
|
|||
const amqp = require('amqplib/callback_api'); |
|||
|
|||
let queueParams = {durable: false, exclusive: false, autoDelete: false}; |
|||
let connection; |
|||
let channel; |
|||
let stopped = false; |
|||
const responseTopics = []; |
|||
|
|||
function RabbitMqProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
|
|||
if (!responseTopics.includes(responseTopic)) { |
|||
await createQueue(responseTopic); |
|||
responseTopics.push(responseTopic); |
|||
} |
|||
|
|||
let data = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
let dataBuffer = Buffer.from(data); |
|||
channel.sendToQueue(responseTopic, dataBuffer); |
|||
return new Promise((resolve, reject) => { |
|||
channel.waitForConfirms((err) => { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
const url = `amqp://${host}:${port}${vhost}`; |
|||
|
|||
amqp.credentials.amqplain(username, password); |
|||
connection = await new Promise((resolve, reject) => { |
|||
amqp.connect(url, function (err, connection) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(connection); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
channel = await new Promise((resolve, reject) => { |
|||
connection.createConfirmChannel(function (err, channel) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(channel); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
await createQueue(requestTopic); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); |
|||
|
|||
while (!stopped) { |
|||
let message = await new Promise((resolve, reject) => { |
|||
channel.get(requestTopic, {}, function (err, msg) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(msg); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
if (message) { |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); |
|||
channel.ack(message); |
|||
} else { |
|||
await sleep(poolInterval); |
|||
} |
|||
} |
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
function parseQueueProperties() { |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
function createQueue(topic) { |
|||
return new Promise((resolve, reject) => { |
|||
channel.assertQueue(topic, queueParams, function (err) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function sleep(ms) { |
|||
return new Promise((resolve) => { |
|||
setTimeout(resolve, ms); |
|||
}); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
|
|||
if (channel) { |
|||
logger.info('Stopping RabbitMq chanel.') |
|||
await channel.close(); |
|||
logger.info('RabbitMq chanel stopped'); |
|||
} |
|||
|
|||
if (connection) { |
|||
logger.info('Stopping RabbitMq connection.') |
|||
try { |
|||
await connection.close(); |
|||
logger.info('RabbitMq client connection.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('RabbitMq connection stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
@ -0,0 +1,194 @@ |
|||
/* |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('serviceBusTemplate'); |
|||
const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus"); |
|||
const azure = require('azure-sb'); |
|||
|
|||
const requestTopic = config.get('request_topic'); |
|||
const namespaceName = config.get('service_bus.namespace_name'); |
|||
const sasKeyName = config.get('service_bus.sas_key_name'); |
|||
const sasKey = config.get('service_bus.sas_key'); |
|||
const queueProperties = config.get('service_bus.queue-properties'); |
|||
|
|||
let sbClient; |
|||
let receiverClient; |
|||
let receiver; |
|||
let serviceBusService; |
|||
|
|||
let queueOptions = {}; |
|||
const queues = []; |
|||
const senderMap = new Map(); |
|||
|
|||
function ServiceBusProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
if (!queues.includes(requestTopic)) { |
|||
await createQueueIfNotExist(requestTopic); |
|||
queues.push(requestTopic); |
|||
} |
|||
|
|||
let customSender = senderMap.get(responseTopic); |
|||
|
|||
if (!customSender) { |
|||
customSender = new CustomSender(responseTopic); |
|||
senderMap.set(responseTopic, customSender); |
|||
} |
|||
|
|||
let data = { |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}; |
|||
|
|||
return customSender.send({body: data}); |
|||
} |
|||
} |
|||
|
|||
function CustomSender(topic) { |
|||
this.queueClient = sbClient.createQueueClient(topic); |
|||
this.sender = this.queueClient.createSender(); |
|||
|
|||
this.send = async (message) => { |
|||
return this.sender.send(message); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`; |
|||
sbClient = ServiceBusClient.createFromConnectionString(connectionString); |
|||
serviceBusService = azure.createServiceBusService(connectionString); |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
await new Promise((resolve, reject) => { |
|||
serviceBusService.listQueues((err, data) => { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
data.forEach(queue => { |
|||
queues.push(queue.QueueName); |
|||
}); |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
if (!queues.includes(requestTopic)) { |
|||
await createQueueIfNotExist(requestTopic); |
|||
queues.push(requestTopic); |
|||
} |
|||
|
|||
receiverClient = sbClient.createQueueClient(requestTopic); |
|||
receiver = receiverClient.createReceiver(ReceiveMode.peekLock); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer()); |
|||
|
|||
const messageHandler = async (message) => { |
|||
if (message) { |
|||
messageProcessor.onJsInvokeMessage(message.body); |
|||
await message.complete(); |
|||
} |
|||
}; |
|||
const errorHandler = (error) => { |
|||
logger.error('Failed to receive message from queue.', error); |
|||
}; |
|||
receiver.registerMessageHandler(messageHandler, errorHandler); |
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
async function createQueueIfNotExist(topic) { |
|||
return new Promise((resolve, reject) => { |
|||
serviceBusService.createQueueIfNotExists(topic, queueOptions, (err) => { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function parseQueueProperties() { |
|||
let properties = {}; |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
queueOptions = { |
|||
MaxSizeInMegabytes: properties['maxSizeInMb'], |
|||
DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, |
|||
LockDuration: `PT${properties['lockDurationInSec']}S` |
|||
}; |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
logger.info('Stopping Azure Service Bus resources...') |
|||
if (receiver) { |
|||
try { |
|||
await receiver.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
} |
|||
|
|||
if (receiverClient) { |
|||
try { |
|||
await receiverClient.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
} |
|||
|
|||
senderMap.forEach((k, v) => { |
|||
try { |
|||
v.sender.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
try { |
|||
v.queueClient.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
}); |
|||
|
|||
if (sbClient) { |
|||
try { |
|||
sbClient.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
} |
|||
logger.info('Azure Service Bus resources stopped.') |
|||
process.exit(status); |
|||
} |
|||
Loading…
Reference in new issue