From 43b2eedbd5cc82caa7e5c66ca7125de7d4702b81 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Apr 2020 18:27:53 +0300 Subject: [PATCH] azure service bus js-executor --- .../azure/servicebus/TbServiceBusAdmin.java | 8 +- .../ServiceBusMonolithQueueFactory.java | 30 ++- .../ServiceBusTbCoreQueueFactory.java | 28 ++- .../ServiceBusTbRuleEngineQueueFactory.java | 28 ++- .../api/jsInvokeMessageProcessor.js | 3 +- .../config/custom-environment-variables.yml | 9 +- msa/js-executor/config/default.yml | 2 + msa/js-executor/package.json | 2 + msa/js-executor/queue/awsSqsTemplate.js | 4 +- msa/js-executor/queue/kafkaTemplate.js | 2 +- msa/js-executor/queue/pubSubTemplate.js | 4 +- msa/js-executor/queue/rabbitmqTemplate.js | 8 +- msa/js-executor/queue/serviceBusTemplate.js | 194 ++++++++++++++++++ msa/js-executor/server.js | 13 +- .../rule/engine/delay/TbMsgDelayNode.java | 2 +- 15 files changed, 314 insertions(+), 23 deletions(-) create mode 100644 msa/js-executor/queue/serviceBusTemplate.js diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index 229d6b4244..f108de4a43 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.azure.servicebus; import com.microsoft.azure.servicebus.management.ManagementClient; import com.microsoft.azure.servicebus.management.QueueDescription; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; @@ -71,7 +72,12 @@ public class TbServiceBusAdmin implements TbQueueAdmin { client.createQueue(queueDescription); queues.add(topic); } catch (ServiceBusException | InterruptedException e) { - log.error("Failed to create queue: [{}]", topic, e); + if (e instanceof MessagingEntityAlreadyExistsException) { + queues.add(topic); + log.info("[{}] queue already exists.", topic); + } else { + log.error("Failed to create queue: [{}]", topic, e); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index 3c82e18d91..3db6496b7d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -35,19 +37,20 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") @@ -60,6 +63,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -73,6 +77,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbServiceBusSettings serviceBusSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; @@ -81,6 +86,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceBusSettings = serviceBusSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -152,8 +158,26 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index bcb48630f9..a3ba577f06 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -34,15 +36,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") @@ -54,6 +59,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -67,6 +73,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.serviceBusSettings = serviceBusSettings; this.coreSettings = coreSettings; @@ -74,6 +81,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -132,8 +140,26 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java index 1f492596a1..e4f6dbfbb4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -32,15 +34,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") @@ -51,6 +56,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -61,12 +67,14 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbServiceBusSettings serviceBusSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.serviceBusSettings = serviceBusSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -113,8 +121,26 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index f6da6886e7..79ea505bcf 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -38,12 +38,11 @@ function JsInvokeMessageProcessor(producer) { this.executedScriptsCounter = 0; } -JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(messageStr) { +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { let requestId; let responseTopic; try { - let message = JSON.parse(messageStr); let request = JSON.parse(Buffer.from(message.data).toString('utf8')); let headers = message.headers; let buf = Buffer.from(headers.data['requestId']); diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 88d6341e04..b290719739 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -14,7 +14,7 @@ # limitations under the License. # -service-type: "TB_SERVICE_TYPE" +service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" js: @@ -46,6 +46,13 @@ rabbitmq: password: "TB_QUEUE_RABBIT_MQ_PASSWORD" queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES" +service_bus: + namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME" + sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME" + sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY" + max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES" + queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES" + logger: level: "LOGGER_LEVEL" path: "LOG_FOLDER" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 551aaabdf5..3155b051dc 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -41,6 +41,8 @@ rabbitmq: password: "password" queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000" +service_bus: + queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800" logger: level: "info" diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 3b496c33e6..695b6e7faf 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -18,6 +18,8 @@ "@google-cloud/pubsub": "^1.7.1", "aws-sdk": "^2.663.0", "amqplib": "^0.5.5", + "@azure/service-bus": "^1.1.6", + "azure-sb": "^0.11.1", "long": "^4.0.0", "uuid-parse": "^1.0.0", "winston": "^3.0.0", diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index 297509ca9b..0396824af1 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -113,7 +113,7 @@ function AwsSqsProducer() { Id: message.MessageId, ReceiptHandle: message.ReceiptHandle }); - messageProcessor.onJsInvokeMessage(message.Body); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body)); }); const deleteBatch = { @@ -187,7 +187,7 @@ async function exit(status) { logger.info('Stopping Aws Sqs client.') try { await sqsClient.close(); - logger.info('Aws Sqs client is stopped.') + logger.info('Aws Sqs client stopped.') process.exit(status); } catch (e) { logger.info('Aws Sqs client stop error.'); diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index f0fde2952c..699ca7be00 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -105,7 +105,7 @@ function KafkaProducer() { msg.key = key.toString('utf8'); msg.data = [...data]; msg.headers = {data: headers} - messageProcessor.onJsInvokeMessage(JSON.stringify(msg)); + messageProcessor.onJsInvokeMessage(msg); }, }); diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js index 0db0d19f05..17e1b56e1d 100644 --- a/msa/js-executor/queue/pubSubTemplate.js +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -83,7 +83,7 @@ function PubSubProducer() { const messageHandler = message => { - messageProcessor.onJsInvokeMessage(message.data.toString('utf8')); + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); message.ack(); }; @@ -141,7 +141,7 @@ async function exit(status) { logger.info('Stopping Pub/Sub client.') try { await pubSubClient.close(); - logger.info('Pub/Sub client is stopped.') + logger.info('Pub/Sub client stopped.') process.exit(status); } catch (e) { logger.info('Pub/Sub client stop error.'); diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js index e33409fc5a..1a2905c3a0 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.js +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -109,7 +109,7 @@ function RabbitMqProducer() { }); if (message) { - messageProcessor.onJsInvokeMessage(message.content.toString('utf8')); + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); channel.ack(message); } else { await sleep(poolInterval); @@ -132,7 +132,7 @@ function parseQueueProperties() { function createQueue(topic) { return new Promise((resolve, reject) => { - channel.assertQueue(topic, queueParams, function (err, data) { + channel.assertQueue(topic, queueParams, function (err) { if (err) { reject(err); } else { @@ -158,14 +158,14 @@ async function exit(status) { if (channel) { logger.info('Stopping RabbitMq chanel.') await channel.close(); - logger.info('RabbitMq chanel is stopped'); + logger.info('RabbitMq chanel stopped'); } if (connection) { logger.info('Stopping RabbitMq connection.') try { await connection.close(); - logger.info('RabbitMq client is connection.') + logger.info('RabbitMq client connection.') process.exit(status); } catch (e) { logger.info('RabbitMq connection stop error.'); diff --git a/msa/js-executor/queue/serviceBusTemplate.js b/msa/js-executor/queue/serviceBusTemplate.js new file mode 100644 index 0000000000..034921afb7 --- /dev/null +++ b/msa/js-executor/queue/serviceBusTemplate.js @@ -0,0 +1,194 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('serviceBusTemplate'); +const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus"); +const azure = require('azure-sb'); + +const requestTopic = config.get('request_topic'); +const namespaceName = config.get('service_bus.namespace_name'); +const sasKeyName = config.get('service_bus.sas_key_name'); +const sasKey = config.get('service_bus.sas_key'); +const queueProperties = config.get('service_bus.queue-properties'); + +let sbClient; +let receiverClient; +let receiver; +let serviceBusService; + +let queueOptions = {}; +const queues = []; +const senderMap = new Map(); + +function ServiceBusProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + if (!queues.includes(requestTopic)) { + await createQueueIfNotExist(requestTopic); + queues.push(requestTopic); + } + + let customSender = senderMap.get(responseTopic); + + if (!customSender) { + customSender = new CustomSender(responseTopic); + senderMap.set(responseTopic, customSender); + } + + let data = { + key: scriptId, + data: [...rawResponse], + headers: headers + }; + + return customSender.send({body: data}); + } +} + +function CustomSender(topic) { + this.queueClient = sbClient.createQueueClient(topic); + this.sender = this.queueClient.createSender(); + + this.send = async (message) => { + return this.sender.send(message); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`; + sbClient = ServiceBusClient.createFromConnectionString(connectionString); + serviceBusService = azure.createServiceBusService(connectionString); + + parseQueueProperties(); + + await new Promise((resolve, reject) => { + serviceBusService.listQueues((err, data) => { + if (err) { + reject(err); + } else { + data.forEach(queue => { + queues.push(queue.QueueName); + }); + resolve(); + } + }); + }); + + if (!queues.includes(requestTopic)) { + await createQueueIfNotExist(requestTopic); + queues.push(requestTopic); + } + + receiverClient = sbClient.createQueueClient(requestTopic); + receiver = receiverClient.createReceiver(ReceiveMode.peekLock); + + const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer()); + + const messageHandler = async (message) => { + if (message) { + messageProcessor.onJsInvokeMessage(message.body); + await message.complete(); + } + }; + const errorHandler = (error) => { + logger.error('Failed to receive message from queue.', error); + }; + receiver.registerMessageHandler(messageHandler, errorHandler); + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +async function createQueueIfNotExist(topic) { + return new Promise((resolve, reject) => { + serviceBusService.createQueueIfNotExists(topic, queueOptions, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +function parseQueueProperties() { + let properties = {}; + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); + queueOptions = { + MaxSizeInMegabytes: properties['maxSizeInMb'], + DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, + LockDuration: `PT${properties['lockDurationInSec']}S` + }; +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + logger.info('Stopping Azure Service Bus resources...') + if (receiver) { + try { + await receiver.close(); + } catch (e) { + + } + } + + if (receiverClient) { + try { + await receiverClient.close(); + } catch (e) { + + } + } + + senderMap.forEach((k, v) => { + try { + v.sender.close(); + } catch (e) { + + } + try { + v.queueClient.close(); + } catch (e) { + + } + }); + + if (sbClient) { + try { + sbClient.close(); + } catch (e) { + + } + } + logger.info('Azure Service Bus resources stopped.') + process.exit(status); +} \ No newline at end of file diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index bd84289200..58361016c4 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -21,22 +21,27 @@ switch (serviceType) { case 'kafka': logger.info('Starting kafka template.'); require('./queue/kafkaTemplate'); - logger.info('kafka template is started.'); + logger.info('kafka template started.'); break; case 'pubsub': logger.info('Starting Pub/Sub template.') require('./queue/pubSubTemplate'); - logger.info('Pub/Sub template is started.') + logger.info('Pub/Sub template started.') break; case 'aws-sqs': logger.info('Starting Aws Sqs template.') require('./queue/awsSqsTemplate'); - logger.info('Aws Sqs template is started.') + logger.info('Aws Sqs template started.') break; case 'rabbitmq': logger.info('Starting RabbitMq template.') require('./queue/rabbitmqTemplate'); - logger.info('RabbitMq template is started.') + logger.info('RabbitMq template started.') + break; + case 'service-bus': + logger.info('Starting Azure Service Bus template.') + require('./queue/serviceBusTemplate'); + logger.info('Azure Service Bus template started.') break; default: logger.error('Unknown service type: ', serviceType); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 1807c5c4fe..e2bb07fb6f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -41,7 +41,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; name = "delay", configClazz = TbMsgDelayNodeConfiguration.class, nodeDescription = "Delays incoming message", - nodeDetails = "Delays messages for configurable period.", + nodeDetails = "Delays messages for configurable period. Please note, this node acknowledges the message from the current queue (message will be removed from queue)", icon = "pause", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeMsgDelayConfig"