|
|
|
@ -18,8 +18,7 @@ |
|
|
|
const config = require('config'), |
|
|
|
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|
|
|
logger = require('../config/logger')._logger('serviceBusTemplate'); |
|
|
|
const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus"); |
|
|
|
const azure = require('azure-sb'); |
|
|
|
const {ServiceBusClient, ServiceBusAdministrationClient} = require("@azure/service-bus"); |
|
|
|
|
|
|
|
const requestTopic = config.get('request_topic'); |
|
|
|
const namespaceName = config.get('service_bus.namespace_name'); |
|
|
|
@ -28,7 +27,6 @@ const sasKey = config.get('service_bus.sas_key'); |
|
|
|
const queueProperties = config.get('service_bus.queue_properties'); |
|
|
|
|
|
|
|
let sbClient; |
|
|
|
let receiverClient; |
|
|
|
let receiver; |
|
|
|
let serviceBusService; |
|
|
|
|
|
|
|
@ -61,11 +59,10 @@ function ServiceBusProducer() { |
|
|
|
} |
|
|
|
|
|
|
|
function CustomSender(topic) { |
|
|
|
this.queueClient = sbClient.createQueueClient(topic); |
|
|
|
this.sender = this.queueClient.createSender(); |
|
|
|
this.sender = sbClient.createSender(topic); |
|
|
|
|
|
|
|
this.send = async (message) => { |
|
|
|
return this.sender.send(message); |
|
|
|
return this.sender.sendMessages(message); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -74,8 +71,8 @@ function CustomSender(topic) { |
|
|
|
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|
|
|
|
|
|
|
const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`; |
|
|
|
sbClient = ServiceBusClient.createFromConnectionString(connectionString); |
|
|
|
serviceBusService = azure.createServiceBusService(connectionString); |
|
|
|
sbClient = new ServiceBusClient(connectionString) |
|
|
|
serviceBusService = new ServiceBusAdministrationClient(connectionString); |
|
|
|
|
|
|
|
parseQueueProperties(); |
|
|
|
|
|
|
|
@ -84,9 +81,9 @@ function CustomSender(topic) { |
|
|
|
if (err) { |
|
|
|
reject(err); |
|
|
|
} else { |
|
|
|
data.forEach(queue => { |
|
|
|
queues.push(queue.QueueName); |
|
|
|
}); |
|
|
|
for (const queue of data) { |
|
|
|
queues.push(queue.name); |
|
|
|
} |
|
|
|
resolve(); |
|
|
|
} |
|
|
|
}); |
|
|
|
@ -97,8 +94,7 @@ function CustomSender(topic) { |
|
|
|
queues.push(requestTopic); |
|
|
|
} |
|
|
|
|
|
|
|
receiverClient = sbClient.createQueueClient(requestTopic); |
|
|
|
receiver = receiverClient.createReceiver(ReceiveMode.peekLock); |
|
|
|
receiver = sbClient.createReceiver(requestTopic, {receiveMode: 'peekLock'}); |
|
|
|
|
|
|
|
const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer()); |
|
|
|
|
|
|
|
@ -111,18 +107,18 @@ function CustomSender(topic) { |
|
|
|
const errorHandler = (error) => { |
|
|
|
logger.error('Failed to receive message from queue.', error); |
|
|
|
}; |
|
|
|
receiver.registerMessageHandler(messageHandler, errorHandler); |
|
|
|
receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) |
|
|
|
} catch (e) { |
|
|
|
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|
|
|
logger.error(e.stack); |
|
|
|
exit(-1); |
|
|
|
await exit(-1); |
|
|
|
} |
|
|
|
})(); |
|
|
|
|
|
|
|
async function createQueueIfNotExist(topic) { |
|
|
|
return new Promise((resolve, reject) => { |
|
|
|
serviceBusService.createQueueIfNotExists(topic, queueOptions, (err) => { |
|
|
|
if (err) { |
|
|
|
serviceBusService.createQueue(topic, queueOptions, (err) => { |
|
|
|
if (err && err.code !== "MessageEntityAlreadyExistsError") { |
|
|
|
reject(err); |
|
|
|
} else { |
|
|
|
resolve(); |
|
|
|
@ -139,10 +135,10 @@ function parseQueueProperties() { |
|
|
|
properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|
|
|
}); |
|
|
|
queueOptions = { |
|
|
|
DuplicateDetection: 'false', |
|
|
|
MaxSizeInMegabytes: properties['maxSizeInMb'], |
|
|
|
DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, |
|
|
|
LockDuration: `PT${properties['lockDurationInSec']}S` |
|
|
|
requiresDuplicateDetection: false, |
|
|
|
maxSizeInMegabytes: properties['maxSizeInMb'], |
|
|
|
defaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, |
|
|
|
lockDuration: `PT${properties['lockDurationInSec']}S` |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
@ -161,24 +157,11 @@ async function exit(status) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (receiverClient) { |
|
|
|
try { |
|
|
|
await receiverClient.close(); |
|
|
|
} catch (e) { |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
senderMap.forEach((k, v) => { |
|
|
|
try { |
|
|
|
v.sender.close(); |
|
|
|
} catch (e) { |
|
|
|
|
|
|
|
} |
|
|
|
try { |
|
|
|
v.queueClient.close(); |
|
|
|
} catch (e) { |
|
|
|
|
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
@ -191,4 +174,4 @@ async function exit(status) { |
|
|
|
} |
|
|
|
logger.info('Azure Service Bus resources stopped.') |
|
|
|
process.exit(status); |
|
|
|
} |
|
|
|
} |
|
|
|
|