From 4fb309c37ea5854b1c7650c73b48db2bd86d5d56 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 21 Apr 2020 13:30:11 +0300 Subject: [PATCH 1/8] moved kafka from service.js to own module --- .../service/script/RemoteJsInvokeService.java | 10 +- .../src/main/resources/thingsboard.yml | 6 +- .../api/jsInvokeMessageProcessor.js | 23 ++-- .../config/custom-environment-variables.yml | 2 + msa/js-executor/config/default.yml | 2 + msa/js-executor/queue/kafka/kafkaTemplate.js | 117 ++++++++++++++++++ msa/js-executor/server.js | 94 ++------------ 7 files changed, 152 insertions(+), 102 deletions(-) create mode 100644 msa/js-executor/queue/kafka/kafkaTemplate.js diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 8d1f9d662a..baf54bfe2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -46,17 +46,17 @@ import java.util.concurrent.atomic.AtomicInteger; @Service public class RemoteJsInvokeService extends AbstractJsInvokeService { - @Value("${js.remote.max_requests_timeout}") + @Value("${queue.js.max_requests_timeout}") private long maxRequestsTimeout; @Getter - @Value("${js.remote.max_errors}") +// @Value("${queue.js.max_errors}") private int maxErrors; - @Value("${js.remote.max_black_list_duration_sec:60}") + @Value("${queue.js.max_black_list_duration_sec:60}") private int maxBlackListDurationSec; - @Value("${js.remote.stats.enabled:false}") + @Value("${queue.js.stats.enabled:false}") private boolean statsEnabled; private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0); @@ -65,7 +65,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0); - @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") +// @Scheduled(fixedDelayString = "${queue.js.stats.print_interval_ms}") public void printStats() { if (statsEnabled) { int pushedMsgs = kafkaPushedMsgs.getAndSet(0); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e1296275cd..89ac618d6d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -415,7 +415,7 @@ state: persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" js: - evaluator: "${JS_EVALUATOR:local}" # local/remote + evaluator: "${JS_EVALUATOR:remote}" # local/remote # Built-in JVM JavaScript environment properties local: # Use Sandboxed (secured) JVM JavaScript environment @@ -582,9 +582,9 @@ queue: print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" js: # JS Eval request topic - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" # JS Eval responses topic prefix that is combined with node id - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}" + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}" # JS Eval max pending requests max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" # JS Eval max request timeout diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index f0facf8cc1..4cf20b4227 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -31,6 +31,7 @@ const useSandbox = config.get('script.use_sandbox') === 'true'; const maxActiveScripts = Number(config.get('script.max_active_scripts')); function JsInvokeMessageProcessor(producer) { + console.log("Kafka Producer:", producer); this.producer = producer; this.executor = new JsExecutor(useSandbox); this.scriptMap = {}; @@ -144,17 +145,17 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, r JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) { var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); - this.producer.send( - { - topic: responseTopic, - messages: [ - { - key: scriptId, - value: rawResponse, - headers: headers - } - ] - } + this.producer.send(responseTopic, scriptId, rawResponse, headers + // { + // topic: responseTopic, + // messages: [ + // { + // key: scriptId, + // value: rawResponse, + // headers: headers + // } + // ] + // } ).then( () => {}, (err) => { diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 585dfe8adb..77e68b0429 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -14,6 +14,8 @@ # limitations under the License. # +service-type: "TB_SERVICE_TYPE" + kafka: request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" bootstrap: diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 1290a8a429..724901d171 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -14,6 +14,8 @@ # limitations under the License. # +service-type: "kafka" + kafka: request_topic: "js.eval.requests" bootstrap: diff --git a/msa/js-executor/queue/kafka/kafkaTemplate.js b/msa/js-executor/queue/kafka/kafkaTemplate.js new file mode 100644 index 0000000000..bb3f81c48d --- /dev/null +++ b/msa/js-executor/queue/kafka/kafkaTemplate.js @@ -0,0 +1,117 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +const { logLevel, Kafka } = require('kafkajs'); + +const config = require('config'), + JsInvokeMessageProcessor = require('../../api/jsInvokeMessageProcessor'), + logger = require('../../config/logger')._logger('main'), + KafkaJsWinstonLogCreator = require('../../config/logger').KafkaJsWinstonLogCreator; + +var kafkaClient; +var consumer; +var producer; + +function KafkaProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + return producer.send( + { + topic: responseTopic, + messages: [ + { + key: scriptId, + value: rawResponse, + headers: headers + } + ] + }); + } +} + +(async() => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); + const kafkaRequestTopic = config.get('kafka.request_topic'); + + logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); + + kafkaClient = new Kafka({ + brokers: kafkaBootstrapServers.split(','), + logLevel: logLevel.INFO, + logCreator: KafkaJsWinstonLogCreator + }); + + consumer = kafkaClient.consumer({ groupId: 'js-executor-group' }); + producer = kafkaClient.producer(); + const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); + await consumer.connect(); + await producer.connect(); + await consumer.subscribe({ topic: kafkaRequestTopic}); + + logger.info('Started ThingsBoard JavaScript Executor Microservice.'); + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + messageProcessor.onJsInvokeMessage(message); + }, + }); + + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + if (consumer) { + logger.info('Stopping Kafka Consumer...'); + var _consumer = consumer; + consumer = null; + try { + await _consumer.disconnect(); + logger.info('Kafka Consumer stopped.'); + await disconnectProducer(); + process.exit(status); + } catch (e) { + logger.info('Kafka Consumer stop error.'); + await disconnectProducer(); + process.exit(status); + } + } else { + process.exit(status); + } +} + +async function disconnectProducer() { + if (producer) { + logger.info('Stopping Kafka Producer...'); + var _producer = producer; + producer = null; + try { + await _producer.disconnect(); + logger.info('Kafka Producer stopped.'); + } catch (e) { + logger.info('Kafka Producer stop error.'); + } + } +} diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index f56e5bb766..486f9dc187 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -13,89 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -const { logLevel, Kafka } = require('kafkajs'); -const config = require('config'), - JsInvokeMessageProcessor = require('./api/jsInvokeMessageProcessor'), - logger = require('./config/logger')._logger('main'), - KafkaJsWinstonLogCreator = require('./config/logger').KafkaJsWinstonLogCreator; - -var kafkaClient; -var consumer; -var producer; - -(async() => { - try { - logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - - const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); - const kafkaRequestTopic = config.get('kafka.request_topic'); - - logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); - - kafkaClient = new Kafka({ - brokers: kafkaBootstrapServers.split(','), - logLevel: logLevel.INFO, - logCreator: KafkaJsWinstonLogCreator - }); - - consumer = kafkaClient.consumer({ groupId: 'js-executor-group' }); - producer = kafkaClient.producer(); - const messageProcessor = new JsInvokeMessageProcessor(producer); - await consumer.connect(); - await producer.connect(); - await consumer.subscribe({ topic: kafkaRequestTopic}); - - logger.info('Started ThingsBoard JavaScript Executor Microservice.'); - await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - messageProcessor.onJsInvokeMessage(message); - }, - }); - - } catch (e) { - logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - logger.error(e.stack); - exit(-1); - } -})(); - -process.on('exit', () => { - exit(0); -}); - -async function exit(status) { - logger.info('Exiting with status: %d ...', status); - if (consumer) { - logger.info('Stopping Kafka Consumer...'); - var _consumer = consumer; - consumer = null; - try { - await _consumer.disconnect(); - logger.info('Kafka Consumer stopped.'); - await disconnectProducer(); - process.exit(status); - } catch (e) { - logger.info('Kafka Consumer stop error.'); - await disconnectProducer(); - process.exit(status); - } - } else { - process.exit(status); - } +const config = require('config'); + +const serviceType = config.get('service-type'); +switch (serviceType) { + case 'kafka': + require('./queue/kafka/kafkaTemplate'); + console.log('Used kafka template.'); + break; + default: + console.error('Unknown service type: ', serviceType); + process.exit(-1); } -async function disconnectProducer() { - if (producer) { - logger.info('Stopping Kafka Producer...'); - var _producer = producer; - producer = null; - try { - await _producer.disconnect(); - logger.info('Kafka Producer stopped.'); - } catch (e) { - logger.info('Kafka Producer stop error.'); - } - } -} From 0dc31fbdde07dada1e79e8d7ce1c57cab97324b9 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 28 Apr 2020 16:13:16 +0300 Subject: [PATCH 2/8] created awsSqs, pubSub, rabbitmq js-executors --- .../provider/AwsSqsMonolithQueueFactory.java | 36 +++- .../provider/PubSubMonolithQueueFactory.java | 36 +++- .../RabbitMqMonolithQueueFactory.java | 37 +++- .../api/jsInvokeMessageProcessor.js | 59 +++---- .../config/custom-environment-variables.yml | 23 ++- msa/js-executor/config/default.yml | 14 +- msa/js-executor/package.json | 3 + msa/js-executor/queue/awsSqsTemplate.js | 163 ++++++++++++++++++ .../queue/{kafka => }/kafkaTemplate.js | 61 ++++--- msa/js-executor/queue/pubSubTemplate.js | 87 ++++++++++ msa/js-executor/queue/rabbitmqTemplate.js | 149 ++++++++++++++++ msa/js-executor/server.js | 24 ++- 12 files changed, 616 insertions(+), 76 deletions(-) create mode 100644 msa/js-executor/queue/awsSqsTemplate.js rename msa/js-executor/queue/{kafka => }/kafkaTemplate.js (64%) create mode 100644 msa/js-executor/queue/pubSubTemplate.js create mode 100644 msa/js-executor/queue/rabbitmqTemplate.js diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index 76ff04c238..c86baf9c48 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -15,20 +15,26 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.*; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; @@ -40,6 +46,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='monolith'") @@ -52,6 +59,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbAwsSqsSettings sqsSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -65,7 +73,8 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbAwsSqsSettings sqsSettings, - TbAwsSqsQueueAttributes sqsQueueAttributes) { + TbAwsSqsQueueAttributes sqsQueueAttributes, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -73,6 +82,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.sqsSettings = sqsSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -144,8 +154,26 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + @Bean + public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index b1afc61dbd..760fa7708d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -15,10 +15,13 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; @@ -30,6 +33,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -39,11 +43,14 @@ import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import java.nio.charset.StandardCharsets; + @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'") public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { @@ -56,6 +63,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng private final TbQueueAdmin admin; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings, TbQueueCoreSettings coreSettings, @@ -63,7 +71,8 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, PartitionService partitionService, - TbServiceInfoProvider serviceInfoProvider) { + TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; @@ -72,6 +81,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng this.admin = new TbPubSubAdmin(pubSubSettings); this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; } @Override @@ -138,7 +148,25 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + @Bean + public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(admin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(admin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(admin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java index ff4a69e2e6..2435b30851 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java @@ -15,28 +15,37 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import java.nio.charset.StandardCharsets; + @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'") public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { @@ -48,6 +57,8 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbRabbitMqSettings rabbitMqSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; + private final TbQueueAdmin admin; public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, @@ -56,6 +67,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbRabbitMqSettings rabbitMqSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbQueueAdmin admin) { this.partitionService = partitionService; this.coreSettings = coreSettings; @@ -64,6 +76,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.rabbitMqSettings = rabbitMqSettings; + this.jsInvokeSettings = jsInvokeSettings; this.admin = admin; } @@ -130,7 +143,25 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + @Bean + public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(admin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } } diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index 4cf20b4227..f6da6886e7 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -19,7 +19,6 @@ const COMPILATION_ERROR = 0; const RUNTIME_ERROR = 1; const TIMEOUT_ERROR = 2; const UNRECOGNIZED = -1; -let headers; const config = require('config'), logger = require('../config/logger')._logger('JsInvokeMessageProcessor'), @@ -31,7 +30,7 @@ const useSandbox = config.get('script.use_sandbox') === 'true'; const maxActiveScripts = Number(config.get('script.max_active_scripts')); function JsInvokeMessageProcessor(producer) { - console.log("Kafka Producer:", producer); + console.log("Producer:", producer); this.producer = producer; this.executor = new JsExecutor(useSandbox); this.scriptMap = {}; @@ -39,26 +38,27 @@ function JsInvokeMessageProcessor(producer) { this.executedScriptsCounter = 0; } -JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(messageStr) { - var requestId; - var responseTopic; + let requestId; + let responseTopic; try { - var request = JSON.parse(message.value.toString('utf8')); - headers = message.headers; - var buf = message.headers['requestId']; + let message = JSON.parse(messageStr); + let request = JSON.parse(Buffer.from(message.data).toString('utf8')); + let headers = message.headers; + let buf = Buffer.from(headers.data['requestId']); requestId = Utils.UUIDFromBuffer(buf); - buf = message.headers['responseTopic']; + buf = Buffer.from(headers.data['responseTopic']); responseTopic = buf.toString('utf8'); logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic); if (request.compileRequest) { - this.processCompileRequest(requestId, responseTopic, request.compileRequest); + this.processCompileRequest(requestId, responseTopic, headers, request.compileRequest); } else if (request.invokeRequest) { - this.processInvokeRequest(requestId, responseTopic, request.invokeRequest); + this.processInvokeRequest(requestId, responseTopic, headers, request.invokeRequest); } else if (request.releaseRequest) { - this.processReleaseRequest(requestId, responseTopic, request.releaseRequest); + this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest); } else { logger.error('[%s] Unknown request recevied!', requestId); } @@ -69,7 +69,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { } } -JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, compileRequest) { +JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, headers, compileRequest) { var scriptId = getScriptId(compileRequest); logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); @@ -78,17 +78,17 @@ JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, r this.cacheScript(scriptId, script); var compileResponse = createCompileResponse(scriptId, true); logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, compileResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); }, (err) => { var compileResponse = createCompileResponse(scriptId, false, COMPILATION_ERROR, err); logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, compileResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); } ); } -JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, invokeRequest) { +JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, headers, invokeRequest) { var scriptId = getScriptId(invokeRequest); logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId); this.executedScriptsCounter++; @@ -104,7 +104,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re (result) => { var invokeResponse = createInvokeResponse(result, true); logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); }, (err) => { var errorCode; @@ -115,19 +115,19 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re } var invokeResponse = createInvokeResponse("", false, errorCode, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); } ) }, (err) => { var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); } ); } -JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, releaseRequest) { +JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, headers, releaseRequest) { var scriptId = getScriptId(releaseRequest); logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId); if (this.scriptMap[scriptId]) { @@ -139,28 +139,17 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, r } var releaseResponse = createReleaseResponse(scriptId, true); logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, null, null, releaseResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, null, releaseResponse); } -JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) { +JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) { var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); - this.producer.send(responseTopic, scriptId, rawResponse, headers - // { - // topic: responseTopic, - // messages: [ - // { - // key: scriptId, - // value: rawResponse, - // headers: headers - // } - // ] - // } - ).then( + this.producer.send(responseTopic, scriptId, rawResponse, headers).then( () => {}, (err) => { if (err) { - logger.error('[%s] Failed to send response to kafka: %s', requestId, err.message); + logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); logger.error(err.stack); } } diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 77e68b0429..3beab91b40 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -15,12 +15,33 @@ # service-type: "TB_SERVICE_TYPE" +request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" kafka: - request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" bootstrap: # Kafka Bootstrap Servers servers: "TB_KAFKA_SERVERS" + +pubsub: + project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" + service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT" + +aws_sqs: + access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID" + secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY" + region: "TB_QUEUE_AWS_SQS_REGION" + +rabbitmq: + exchange_name: "TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME" + host: "TB_QUEUE_RABBIT_MQ_HOST" + port: "TB_QUEUE_RABBIT_MQ_PORT" + virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST" + username: "TB_QUEUE_RABBIT_MQ_USERNAME" + password: "TB_QUEUE_RABBIT_MQ_PASSWORD" + automatic_recovery_enabled: "TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED" + connection_timeout: "TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT" + handshake_timeout: "TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT" + logger: level: "LOGGER_LEVEL" path: "LOG_FOLDER" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 724901d171..ee38296b31 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -15,13 +15,25 @@ # service-type: "kafka" +request_topic: "js.eval.requests" kafka: - request_topic: "js.eval.requests" bootstrap: # Kafka Bootstrap Servers servers: "localhost:9092" +rabbitmq: + exchange_name: "" + host: "localhost" + port: "5672" + virtual_host: "/" + username: "YOUR_USERNAME" + password: "YOUR_PASSWORD" + automatic_recovery_enabled: "false" + connection_timeout: "60000" + handshake_timeout: "10000" + + logger: level: "info" path: "logs" diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index c87b62c6eb..6b2ac41a9c 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -15,6 +15,9 @@ "config": "^3.2.2", "js-yaml": "^3.12.0", "kafkajs": "^1.11.0", + "@google-cloud/pubsub": "^1.7.1", + "aws-sdk": "^2.663.0", + "amqplib": "^0.5.5", "long": "^4.0.0", "uuid-parse": "^1.0.0", "winston": "^3.0.0", diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js new file mode 100644 index 0000000000..6bd2f73510 --- /dev/null +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -0,0 +1,163 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('awsSqsTemplate'); + +const requestTopic = config.get('request_topic'); + +const accessKeyId = config.get('aws_sqs.access_key_id'); +const secretAccessKey = config.get('aws_sqs.secret_access_key'); +const region = config.get('aws_sqs.region'); +const AWS = require('aws-sdk'); + +let sqsClient; +let queueURL; +let responseTopics = new Map(); +let stopped = false; + +function AwsSqsProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + let msgBody = JSON.stringify( + { + key: scriptId, + data: [...rawResponse], + headers: headers + }); + + let responseQueueUrl = responseTopics.get(responseTopic); + + if (!responseQueueUrl) { + responseQueueUrl = await createQueue(responseTopic); + responseTopics.set(responseTopic, responseQueueUrl); + } + + let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; + + return new Promise((resolve, reject) => { + sqsClient.sendMessage(params, function (err, data) { + if (err) { + reject(err); + } else { + resolve(data); + } + }); + }); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + AWS.config.update({accessKeyId: accessKeyId, secretAccessKey: secretAccessKey, region: region}); + + sqsClient = new AWS.SQS({apiVersion: '2012-11-05'}); + + queueURL = await createQueue(requestTopic); + const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer()); + + const params = { + MaxNumberOfMessages: 10, + QueueUrl: queueURL, + WaitTimeSeconds: 0.025 + }; + while (!stopped) { + const messages = await new Promise((resolve, reject) => { + sqsClient.receiveMessage(params, function (err, data) { + if (err) { + reject(err); + } else { + resolve(data.Messages); + } + }); + }); + + if (messages && messages.length > 0) { + const entries = []; + + messages.forEach(message => { + entries.push({ + Id: message.MessageId, + ReceiptHandle: message.ReceiptHandle + }); + messageProcessor.onJsInvokeMessage(message.Body); + }); + + const deleteBatch = { + QueueUrl: queueURL, + Entries: entries + }; + sqsClient.deleteMessageBatch(deleteBatch, function (err, data) { + if (err) { + logger.error("Failed to delete messages from queue.", err.message); + } else { + //do nothing + } + }); + } + } + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +function createQueue(topic) { + let queueName = topic.replace(/\./g, '_') + '.fifo'; + let queueParams = { + QueueName: queueName, Attributes: { + FifoQueue: 'true', + ContentBasedDeduplication: 'true' + + } + }; + return new Promise((resolve, reject) => { + sqsClient.createQueue(queueParams, function (err, data) { + if (err) { + reject(err); + } else { + resolve(data.QueueUrl); + } + }); + }); +} + +process.on('exit', () => { + stopped = true; + logger.info('Aws Sqs client stopped.'); + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + if (sqsClient) { + logger.info('Stopping Aws Sqs client.') + try { + await sqsClient.close(); + logger.info('Aws Sqs client is stopped.') + process.exit(status); + } catch (e) { + logger.info('Aws Sqs client stop error.'); + process.exit(status); + } + } else { + process.exit(status); + } +} diff --git a/msa/js-executor/queue/kafka/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js similarity index 64% rename from msa/js-executor/queue/kafka/kafkaTemplate.js rename to msa/js-executor/queue/kafkaTemplate.js index bb3f81c48d..38e713a7db 100644 --- a/msa/js-executor/queue/kafka/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -13,39 +13,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -const { logLevel, Kafka } = require('kafkajs'); +const {logLevel, Kafka} = require('kafkajs'); const config = require('config'), - JsInvokeMessageProcessor = require('../../api/jsInvokeMessageProcessor'), - logger = require('../../config/logger')._logger('main'), - KafkaJsWinstonLogCreator = require('../../config/logger').KafkaJsWinstonLogCreator; + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('kafkaTemplate'), + KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; -var kafkaClient; -var consumer; -var producer; +let kafkaClient; +let consumer; +let producer; function KafkaProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { + let headersData = headers.data; + headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)])); return producer.send( - { - topic: responseTopic, - messages: [ - { - key: scriptId, - value: rawResponse, - headers: headers - } - ] - }); + { + topic: responseTopic, + messages: [ + { + key: scriptId, + value: rawResponse, + headers: headersData + } + ] + }); } } -(async() => { +(async () => { try { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); - const kafkaRequestTopic = config.get('kafka.request_topic'); + const kafkaRequestTopic = config.get('request_topic'); logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); @@ -56,17 +58,28 @@ function KafkaProducer() { logCreator: KafkaJsWinstonLogCreator }); - consumer = kafkaClient.consumer({ groupId: 'js-executor-group' }); + consumer = kafkaClient.consumer({groupId: 'js-executor-group'}); producer = kafkaClient.producer(); const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); await consumer.connect(); await producer.connect(); - await consumer.subscribe({ topic: kafkaRequestTopic}); + await consumer.subscribe({topic: kafkaRequestTopic}); logger.info('Started ThingsBoard JavaScript Executor Microservice.'); await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - messageProcessor.onJsInvokeMessage(message); + eachMessage: async ({topic, partition, message}) => { + let headers = message.headers; + let key = message.key; + let data = message.value; + let msg = {}; + + headers = Object.fromEntries( + Object.entries(headers).map(([key, value]) => [key, [...value]])); + + msg.key = key.toString('utf8'); + msg.data = [...data]; + msg.headers = {data: headers} + messageProcessor.onJsInvokeMessage(JSON.stringify(msg)); }, }); @@ -85,7 +98,7 @@ async function exit(status) { logger.info('Exiting with status: %d ...', status); if (consumer) { logger.info('Stopping Kafka Consumer...'); - var _consumer = consumer; + let _consumer = consumer; consumer = null; try { await _consumer.disconnect(); diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js new file mode 100644 index 0000000000..c8b39f7d6b --- /dev/null +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -0,0 +1,87 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('pubSubTemplate'); +const {PubSub} = require('@google-cloud/pubsub'); + +const projectId = config.get('pubsub.project_id'); +const credentials = JSON.parse(config.get('pubsub.service_account')); +const requestTopic = config.get('request_topic'); + +let pubSubClient; + +function PubSubProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + let data = JSON.stringify( + { + key: scriptId, + data: [...rawResponse], + headers: headers + }); + let dataBuffer = Buffer.from(data); + return pubSubClient.topic(responseTopic).publish(dataBuffer); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + pubSubClient = new PubSub({projectId: projectId, credentials: credentials}); + + const subscription = pubSubClient.subscription(requestTopic); + + const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer()); + + const messageHandler = message => { + + messageProcessor.onJsInvokeMessage(message.data.toString('utf8')); + message.ack(); + }; + + subscription.on('message', messageHandler); + + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + if (pubSubClient) { + logger.info('Stopping Pub/Sub client.') + try { + await pubSubClient.close(); + logger.info('Pub/Sub client is stopped.') + process.exit(status); + } catch (e) { + logger.info('Pub/Sub client stop error.'); + process.exit(status); + } + } else { + process.exit(status); + } +} + diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js new file mode 100644 index 0000000000..0b48cdd62f --- /dev/null +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -0,0 +1,149 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('rabbitmqTemplate'); + +const requestTopic = config.get('request_topic'); +const amqp = require('amqplib/callback_api'); +let connection; +let channel; +let stopped = false; +const responseTopics = []; + +function RabbitMqProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + + if (!responseTopics.includes(responseTopic)) { + await createQueue(responseTopic); + responseTopics.push(responseTopic); + } + + let data = JSON.stringify( + { + key: scriptId, + data: [...rawResponse], + headers: headers + }); + let dataBuffer = Buffer.from(data); + channel.sendToQueue(responseTopic, dataBuffer); + return new Promise((resolve, reject) => { + channel.waitForConfirms((err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + amqp.credentials.amqplain('admin', 'password'); + connection = await new Promise((resolve, reject) => { + amqp.connect('amqp://localhost:5672/', function (err, connection) { + if (err) { + reject(err); + } else { + resolve(connection); + } + }); + }); + + channel = await new Promise((resolve, reject) => { + connection.createConfirmChannel(function (err, channel) { + if (err) { + reject(err); + } else { + resolve(channel); + } + }); + }); + + await createQueue(requestTopic); + + const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); + + while (!stopped) { + let message = await new Promise((resolve, reject) => { + channel.get(requestTopic, {}, function (err, msg) { + if (err) { + reject(err); + } else { + resolve(msg); + } + }); + }); + + if (message) { + messageProcessor.onJsInvokeMessage(message.content.toString('utf8')); + channel.ack(message); + } + } + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +function createQueue(topic) { + let params = {durable: false}; + return new Promise((resolve, reject) => { + channel.assertQueue(topic, params, function (err, data) { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + + if (channel) { + logger.info('Stopping RabbitMq chanel.') + await channel.close(); + logger.info('RabbitMq chanel is stopped'); + } + + if (connection) { + logger.info('Stopping RabbitMq connection.') + try { + await connection.close(); + logger.info('RabbitMq client is connection.') + process.exit(status); + } catch (e) { + logger.info('RabbitMq connection stop error.'); + process.exit(status); + } + } else { + process.exit(status); + } +} \ No newline at end of file diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index 486f9dc187..bd84289200 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -14,16 +14,32 @@ * limitations under the License. */ -const config = require('config'); +const config = require('config'), logger = require('./config/logger')._logger('main'); const serviceType = config.get('service-type'); switch (serviceType) { case 'kafka': - require('./queue/kafka/kafkaTemplate'); - console.log('Used kafka template.'); + logger.info('Starting kafka template.'); + require('./queue/kafkaTemplate'); + logger.info('kafka template is started.'); + break; + case 'pubsub': + logger.info('Starting Pub/Sub template.') + require('./queue/pubSubTemplate'); + logger.info('Pub/Sub template is started.') + break; + case 'aws-sqs': + logger.info('Starting Aws Sqs template.') + require('./queue/awsSqsTemplate'); + logger.info('Aws Sqs template is started.') + break; + case 'rabbitmq': + logger.info('Starting RabbitMq template.') + require('./queue/rabbitmqTemplate'); + logger.info('RabbitMq template is started.') break; default: - console.error('Unknown service type: ', serviceType); + logger.error('Unknown service type: ', serviceType); process.exit(-1); } From d264a5206c35e1b9d1cbab38902c8262516db753 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 28 Apr 2020 17:05:53 +0300 Subject: [PATCH 3/8] revert RemoteJsInvokeService --- .../server/service/script/RemoteJsInvokeService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index baf54bfe2d..fe2e7e8071 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -50,13 +50,13 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { private long maxRequestsTimeout; @Getter -// @Value("${queue.js.max_errors}") + @Value("${js.remote.max_errors}") private int maxErrors; - @Value("${queue.js.max_black_list_duration_sec:60}") + @Value("${js.remote.max_black_list_duration_sec:60}") private int maxBlackListDurationSec; - @Value("${queue.js.stats.enabled:false}") + @Value("${js.remote.stats.enabled:false}") private boolean statsEnabled; private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0); @@ -65,7 +65,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0); -// @Scheduled(fixedDelayString = "${queue.js.stats.print_interval_ms}") + @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") public void printStats() { if (statsEnabled) { int pushedMsgs = kafkaPushedMsgs.getAndSet(0); From 214a6060669ec1c3e68bba8ce66510a114d56bd2 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 28 Apr 2020 17:07:25 +0300 Subject: [PATCH 4/8] revert thingsboard.yml --- application/src/main/resources/thingsboard.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 573f6b9462..bf82c586c3 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -606,9 +606,9 @@ queue: print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" js: # JS Eval request topic - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" # JS Eval responses topic prefix that is combined with node id - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}" + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}" # JS Eval max pending requests max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" # JS Eval max request timeout From ef35ee8c304d743c55c053a0996ce84793daf40e Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Apr 2020 09:41:21 +0300 Subject: [PATCH 5/8] added queue settings to js-executor --- .../config/custom-environment-variables.yml | 12 ++-- msa/js-executor/config/default.yml | 22 +++++-- msa/js-executor/package.json | 2 +- msa/js-executor/queue/awsSqsTemplate.js | 66 ++++++++++++++----- msa/js-executor/queue/kafkaTemplate.js | 57 +++++++++++++++- msa/js-executor/queue/pubSubTemplate.js | 62 +++++++++++++++++ msa/js-executor/queue/rabbitmqTemplate.js | 38 +++++++++-- 7 files changed, 224 insertions(+), 35 deletions(-) diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 3beab91b40..88d6341e04 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -17,30 +17,34 @@ service-type: "TB_SERVICE_TYPE" request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" +js: + response_poll_interval: "REMOTE_JS_RESPONSE_POLL_INTERVAL_MS" + kafka: bootstrap: # Kafka Bootstrap Servers servers: "TB_KAFKA_SERVERS" + replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" + topic-properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" pubsub: project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT" + queue-properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES" aws_sqs: access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID" secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY" region: "TB_QUEUE_AWS_SQS_REGION" + queue-properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES" rabbitmq: - exchange_name: "TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME" host: "TB_QUEUE_RABBIT_MQ_HOST" port: "TB_QUEUE_RABBIT_MQ_PORT" virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST" username: "TB_QUEUE_RABBIT_MQ_USERNAME" password: "TB_QUEUE_RABBIT_MQ_PASSWORD" - automatic_recovery_enabled: "TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED" - connection_timeout: "TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT" - handshake_timeout: "TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT" + queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES" logger: level: "LOGGER_LEVEL" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index ee38296b31..551aaabdf5 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -15,23 +15,31 @@ # service-type: "kafka" -request_topic: "js.eval.requests" +request_topic: "js_eval.requests" + +js: + response_poll_interval: "25" kafka: bootstrap: # Kafka Bootstrap Servers servers: "localhost:9092" + replication_factor: "1" + topic-properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" + +pubsub: + queue-properties: "ackDeadlineInSec:30;messageRetentionInSec:604800" + +aws_sqs: + queue-properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800" rabbitmq: - exchange_name: "" host: "localhost" port: "5672" virtual_host: "/" - username: "YOUR_USERNAME" - password: "YOUR_PASSWORD" - automatic_recovery_enabled: "false" - connection_timeout: "60000" - handshake_timeout: "10000" + username: "admin" + password: "password" + queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000" logger: diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 6b2ac41a9c..3b496c33e6 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -14,7 +14,7 @@ "dependencies": { "config": "^3.2.2", "js-yaml": "^3.12.0", - "kafkajs": "^1.11.0", + "kafkajs": "^1.12.0", "@google-cloud/pubsub": "^1.7.1", "aws-sdk": "^2.663.0", "amqplib": "^0.5.5", diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index 6bd2f73510..c74341d73d 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -26,10 +26,13 @@ const accessKeyId = config.get('aws_sqs.access_key_id'); const secretAccessKey = config.get('aws_sqs.secret_access_key'); const region = config.get('aws_sqs.region'); const AWS = require('aws-sdk'); +const queueProperties = config.get('aws_sqs.queue-properties'); +const poolInterval = config.get('js.response_poll_interval'); +let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; let sqsClient; -let queueURL; -let responseTopics = new Map(); +let requestQueueURL; +let queueUrls = new Map(); let stopped = false; function AwsSqsProducer() { @@ -41,11 +44,11 @@ function AwsSqsProducer() { headers: headers }); - let responseQueueUrl = responseTopics.get(responseTopic); + let responseQueueUrl = queueUrls.get(topicToSqsQueueName(responseTopic)); if (!responseQueueUrl) { responseQueueUrl = await createQueue(responseTopic); - responseTopics.set(responseTopic, responseQueueUrl); + queueUrls.set(responseTopic, responseQueueUrl); } let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; @@ -69,13 +72,27 @@ function AwsSqsProducer() { sqsClient = new AWS.SQS({apiVersion: '2012-11-05'}); - queueURL = await createQueue(requestTopic); + const queues = await getQueues(); + + queues.forEach(queueUrl => { + const delimiterPosition = queueUrl.lastIndexOf('/'); + const queueName = queueUrl.substring(delimiterPosition + 1); + queueUrls.set(queueName, queueUrl); + }) + + parseQueueProperties(); + + requestQueueURL = queueUrls.get(topicToSqsQueueName(requestTopic)); + if (!requestQueueURL) { + requestQueueURL = await createQueue(requestTopic); + } + const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer()); const params = { MaxNumberOfMessages: 10, - QueueUrl: queueURL, - WaitTimeSeconds: 0.025 + QueueUrl: requestQueueURL, + WaitTimeSeconds: poolInterval / 1000 }; while (!stopped) { const messages = await new Promise((resolve, reject) => { @@ -100,7 +117,7 @@ function AwsSqsProducer() { }); const deleteBatch = { - QueueUrl: queueURL, + QueueUrl: requestQueueURL, Entries: entries }; sqsClient.deleteMessageBatch(deleteBatch, function (err, data) { @@ -120,14 +137,9 @@ function AwsSqsProducer() { })(); function createQueue(topic) { - let queueName = topic.replace(/\./g, '_') + '.fifo'; - let queueParams = { - QueueName: queueName, Attributes: { - FifoQueue: 'true', - ContentBasedDeduplication: 'true' + let queueName = topicToSqsQueueName(topic); + let queueParams = {QueueName: queueName, Attributes: queueAttributes}; - } - }; return new Promise((resolve, reject) => { sqsClient.createQueue(queueParams, function (err, data) { if (err) { @@ -139,6 +151,30 @@ function createQueue(topic) { }); } +function getQueues() { + return new Promise((resolve, reject) => { + sqsClient.listQueues(function (err, data) { + if (err) { + reject(err); + } else { + resolve(data.QueueUrls); + } + }); + }); +} + +function topicToSqsQueueName(topic) { + return topic.replace(/\./g, '_') + '.fifo'; +} + +function parseQueueProperties() { + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + queueAttributes[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); +} + process.on('exit', () => { stopped = true; logger.info('Aws Sqs client stopped.'); diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index 38e713a7db..f0fde2952c 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -19,13 +19,28 @@ const config = require('config'), JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), logger = require('../config/logger')._logger('kafkaTemplate'), KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; +const replicationFactor = config.get('kafka.replication_factor'); +const topicProperties = config.get('kafka.topic-properties'); let kafkaClient; +let kafkaAdmin; let consumer; let producer; +const topics = []; +const configEntries = []; + function KafkaProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { + + if (!topics.includes(responseTopic)) { + let createResponseTopicResult = await createTopic(responseTopic); + topics.push(responseTopic); + if (createResponseTopicResult) { + logger.info('Created new topic: %s', requestTopic); + } + } + let headersData = headers.data; headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)])); return producer.send( @@ -47,10 +62,10 @@ function KafkaProducer() { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); - const kafkaRequestTopic = config.get('request_topic'); + const requestTopic = config.get('request_topic'); logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); + logger.info('Kafka Requests Topic: %s', requestTopic); kafkaClient = new Kafka({ brokers: kafkaBootstrapServers.split(','), @@ -58,12 +73,23 @@ function KafkaProducer() { logCreator: KafkaJsWinstonLogCreator }); + parseTopicProperties(); + + kafkaAdmin = kafkaClient.admin(); + await kafkaAdmin.connect(); + + let createRequestTopicResult = await createTopic(requestTopic); + + if (createRequestTopicResult) { + logger.info('Created new topic: %s', requestTopic); + } + consumer = kafkaClient.consumer({groupId: 'js-executor-group'}); producer = kafkaClient.producer(); const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); await consumer.connect(); await producer.connect(); - await consumer.subscribe({topic: kafkaRequestTopic}); + await consumer.subscribe({topic: requestTopic}); logger.info('Started ThingsBoard JavaScript Executor Microservice.'); await consumer.run({ @@ -90,12 +116,37 @@ function KafkaProducer() { } })(); +function createTopic(topic) { + return kafkaAdmin.createTopics({ + topics: [{ + topic: topic, + replicationFactor: replicationFactor, + configEntries: configEntries + }] + }); +} + +function parseTopicProperties() { + const props = topicProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)}); + }); +} + process.on('exit', () => { exit(0); }); async function exit(status) { logger.info('Exiting with status: %d ...', status); + + if (kafkaAdmin) { + logger.info('Stopping Kafka Admin...'); + await kafkaAdmin.disconnect(); + logger.info('Kafka Admin stopped.'); + } + if (consumer) { logger.info('Stopping Kafka Consumer...'); let _consumer = consumer; diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js index c8b39f7d6b..708c1d56a4 100644 --- a/msa/js-executor/queue/pubSubTemplate.js +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -24,11 +24,21 @@ const {PubSub} = require('@google-cloud/pubsub'); const projectId = config.get('pubsub.project_id'); const credentials = JSON.parse(config.get('pubsub.service_account')); const requestTopic = config.get('request_topic'); +const queueProperties = config.get('pubsub.queue-properties'); let pubSubClient; +const topics = []; +const subscriptions = []; +let queueProps = []; + function PubSubProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { + + if (!(subscriptions.includes(responseTopic) && topics.includes(requestTopic))) { + await createTopic(requestTopic); + } + let data = JSON.stringify( { key: scriptId, @@ -45,6 +55,28 @@ function PubSubProducer() { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); pubSubClient = new PubSub({projectId: projectId, credentials: credentials}); + parseQueueProperties(); + + const topicList = await pubSubClient.getTopics(); + + if (topicList) { + topicList[0].forEach(topic => { + topics.push(getName(topic.name)); + }); + } + + const subscriptionList = await pubSubClient.getSubscriptions(); + + if (subscriptionList) { + topicList[0].forEach(sub => { + subscriptions.push(getName(sub.name)); + }); + } + + if (!(subscriptions.includes(requestTopic) && topics.includes(requestTopic))) { + await createTopic(requestTopic); + } + const subscription = pubSubClient.subscription(requestTopic); const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer()); @@ -64,6 +96,36 @@ function PubSubProducer() { } })(); +async function createTopic(topic) { + if (!topics.includes(topic)) { + await pubSubClient.createTopic(topic); + topics.push(topic); + logger.info('Created new Pub/Sub topic: %s', topic); + } + await createSubscription(topic) +} + +async function createSubscription(topic) { + if (!subscriptions.includes(topic)) { + await pubSubClient.topic(topic).createSubscription(topic); + subscriptions.push(topic); + logger.info('Created new Pub/Sub subscription: %s', topic); + } +} + +function parseQueueProperties() { + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); +} + +function getName(fullName) { + const delimiterPosition = fullName.lastIndexOf('/'); + return fullName.substring(delimiterPosition + 1); +} + process.on('exit', () => { exit(0); }); diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js index 0b48cdd62f..e33409fc5a 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.js +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -21,7 +21,17 @@ const config = require('config'), logger = require('../config/logger')._logger('rabbitmqTemplate'); const requestTopic = config.get('request_topic'); +const host = config.get('rabbitmq.host'); +const port = config.get('rabbitmq.port'); +const vhost = config.get('rabbitmq.virtual_host'); +const username = config.get('rabbitmq.username'); +const password = config.get('rabbitmq.password'); +const queueProperties = config.get('rabbitmq.queue-properties'); +const poolInterval = config.get('js.response_poll_interval'); + const amqp = require('amqplib/callback_api'); + +let queueParams = {durable: false, exclusive: false, autoDelete: false}; let connection; let channel; let stopped = false; @@ -58,10 +68,11 @@ function RabbitMqProducer() { (async () => { try { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + const url = `amqp://${host}:${port}${vhost}`; - amqp.credentials.amqplain('admin', 'password'); + amqp.credentials.amqplain(username, password); connection = await new Promise((resolve, reject) => { - amqp.connect('amqp://localhost:5672/', function (err, connection) { + amqp.connect(url, function (err, connection) { if (err) { reject(err); } else { @@ -80,6 +91,8 @@ function RabbitMqProducer() { }); }); + parseQueueProperties(); + await createQueue(requestTopic); const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); @@ -98,6 +111,8 @@ function RabbitMqProducer() { if (message) { messageProcessor.onJsInvokeMessage(message.content.toString('utf8')); channel.ack(message); + } else { + await sleep(poolInterval); } } } catch (e) { @@ -107,10 +122,17 @@ function RabbitMqProducer() { } })(); +function parseQueueProperties() { + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); +} + function createQueue(topic) { - let params = {durable: false}; return new Promise((resolve, reject) => { - channel.assertQueue(topic, params, function (err, data) { + channel.assertQueue(topic, queueParams, function (err, data) { if (err) { reject(err); } else { @@ -120,6 +142,12 @@ function createQueue(topic) { }); } +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + process.on('exit', () => { exit(0); }); @@ -146,4 +174,4 @@ async function exit(status) { } else { process.exit(status); } -} \ No newline at end of file +} From cce6a44adc8a701ec0c08b7bfe91ff6f5388c138 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Apr 2020 10:14:05 +0300 Subject: [PATCH 6/8] refactored queue factories --- .../provider/AwsSqsMonolithQueueFactory.java | 5 +-- .../provider/AwsSqsTbCoreQueueFactory.java | 28 +++++++++++++- .../AwsSqsTbRuleEngineQueueFactory.java | 30 ++++++++++++++- .../provider/PubSubTbCoreQueueFactory.java | 38 ++++++++++++++++--- .../PubSubTbRuleEngineQueueFactory.java | 28 +++++++++++++- .../provider/RabbitMqTbCoreQueueFactory.java | 28 +++++++++++++- .../RabbitMqTbRuleEngineQueueFactory.java | 28 +++++++++++++- 7 files changed, 170 insertions(+), 15 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index c86baf9c48..c1d724c207 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -20,7 +20,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos.*; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; @@ -31,8 +32,6 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; -import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index a7349091bd..4889a7182d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -30,11 +32,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -44,6 +48,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'") @@ -55,6 +60,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -68,6 +74,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbAwsSqsQueueAttributes sqsQueueAttributes) { this.sqsSettings = sqsSettings; this.coreSettings = coreSettings; @@ -75,6 +82,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -133,8 +141,26 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java index 3056eced2c..cf43bc5fe4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -27,11 +29,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -41,6 +45,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'") @@ -51,6 +56,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbAwsSqsSettings sqsSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -61,12 +67,14 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbAwsSqsSettings sqsSettings, - TbAwsSqsQueueAttributes sqsQueueAttributes) { + TbAwsSqsQueueAttributes sqsQueueAttributes, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.sqsSettings = sqsSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -113,8 +121,26 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index 0a23d58e46..7d85b5c3ba 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,21 +30,24 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; -import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; -import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") @@ -53,6 +58,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin jsExecutorAdmin; @@ -64,12 +70,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueTransportApiSettings transportApiSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.transportApiSettings = transportApiSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); @@ -127,8 +135,26 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index 79501130ed..a6105fea0c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") @@ -52,6 +57,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbQueueRuleEngineSettings ruleEngineSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -63,12 +69,14 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); @@ -116,8 +124,26 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index d84bbedbd2..65d582c4bc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -30,6 +32,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -40,10 +43,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") @@ -55,6 +60,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -68,6 +74,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbRabbitMqQueueArguments queueArguments) { this.rabbitMqSettings = rabbitMqSettings; this.coreSettings = coreSettings; @@ -75,6 +82,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); @@ -133,8 +141,26 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java index a18f427fab..26abf78164 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") @@ -52,6 +57,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbRabbitMqSettings rabbitMqSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -62,12 +68,14 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbRabbitMqSettings rabbitMqSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbRabbitMqQueueArguments queueArguments) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.rabbitMqSettings = rabbitMqSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); @@ -114,8 +122,26 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy From 8c413f8b9c0b5ba530e81cf4ddf02995c136fcc8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Apr 2020 10:42:18 +0300 Subject: [PATCH 7/8] added queue params to pubsub js-executor --- msa/js-executor/queue/awsSqsTemplate.js | 2 +- msa/js-executor/queue/pubSubTemplate.js | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index c74341d73d..297509ca9b 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -32,7 +32,7 @@ const poolInterval = config.get('js.response_poll_interval'); let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; let sqsClient; let requestQueueURL; -let queueUrls = new Map(); +const queueUrls = new Map(); let stopped = false; function AwsSqsProducer() { diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js index 708c1d56a4..0db0d19f05 100644 --- a/msa/js-executor/queue/pubSubTemplate.js +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -30,7 +30,7 @@ let pubSubClient; const topics = []; const subscriptions = []; -let queueProps = []; +const queueProps = []; function PubSubProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { @@ -107,7 +107,12 @@ async function createTopic(topic) { async function createSubscription(topic) { if (!subscriptions.includes(topic)) { - await pubSubClient.topic(topic).createSubscription(topic); + await pubSubClient.createSubscription(topic, topic, { + topic: topic, + subscription: topic, + ackDeadlineSeconds: queueProps['ackDeadlineInSec'], + messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']} + }); subscriptions.push(topic); logger.info('Created new Pub/Sub subscription: %s', topic); } From 43b2eedbd5cc82caa7e5c66ca7125de7d4702b81 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Apr 2020 18:27:53 +0300 Subject: [PATCH 8/8] azure service bus js-executor --- .../azure/servicebus/TbServiceBusAdmin.java | 8 +- .../ServiceBusMonolithQueueFactory.java | 30 ++- .../ServiceBusTbCoreQueueFactory.java | 28 ++- .../ServiceBusTbRuleEngineQueueFactory.java | 28 ++- .../api/jsInvokeMessageProcessor.js | 3 +- .../config/custom-environment-variables.yml | 9 +- msa/js-executor/config/default.yml | 2 + msa/js-executor/package.json | 2 + msa/js-executor/queue/awsSqsTemplate.js | 4 +- msa/js-executor/queue/kafkaTemplate.js | 2 +- msa/js-executor/queue/pubSubTemplate.js | 4 +- msa/js-executor/queue/rabbitmqTemplate.js | 8 +- msa/js-executor/queue/serviceBusTemplate.js | 194 ++++++++++++++++++ msa/js-executor/server.js | 13 +- .../rule/engine/delay/TbMsgDelayNode.java | 2 +- 15 files changed, 314 insertions(+), 23 deletions(-) create mode 100644 msa/js-executor/queue/serviceBusTemplate.js diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index 229d6b4244..f108de4a43 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.azure.servicebus; import com.microsoft.azure.servicebus.management.ManagementClient; import com.microsoft.azure.servicebus.management.QueueDescription; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; @@ -71,7 +72,12 @@ public class TbServiceBusAdmin implements TbQueueAdmin { client.createQueue(queueDescription); queues.add(topic); } catch (ServiceBusException | InterruptedException e) { - log.error("Failed to create queue: [{}]", topic, e); + if (e instanceof MessagingEntityAlreadyExistsException) { + queues.add(topic); + log.info("[{}] queue already exists.", topic); + } else { + log.error("Failed to create queue: [{}]", topic, e); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index 3c82e18d91..3db6496b7d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -35,19 +37,20 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") @@ -60,6 +63,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -73,6 +77,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbServiceBusSettings serviceBusSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; @@ -81,6 +86,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceBusSettings = serviceBusSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -152,8 +158,26 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index bcb48630f9..a3ba577f06 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -34,15 +36,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") @@ -54,6 +59,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -67,6 +73,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.serviceBusSettings = serviceBusSettings; this.coreSettings = coreSettings; @@ -74,6 +81,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -132,8 +140,26 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java index 1f492596a1..e4f6dbfbb4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -32,15 +34,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") @@ -51,6 +56,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -61,12 +67,14 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbServiceBusSettings serviceBusSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.serviceBusSettings = serviceBusSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -113,8 +121,26 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index f6da6886e7..79ea505bcf 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -38,12 +38,11 @@ function JsInvokeMessageProcessor(producer) { this.executedScriptsCounter = 0; } -JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(messageStr) { +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { let requestId; let responseTopic; try { - let message = JSON.parse(messageStr); let request = JSON.parse(Buffer.from(message.data).toString('utf8')); let headers = message.headers; let buf = Buffer.from(headers.data['requestId']); diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 88d6341e04..b290719739 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -14,7 +14,7 @@ # limitations under the License. # -service-type: "TB_SERVICE_TYPE" +service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" js: @@ -46,6 +46,13 @@ rabbitmq: password: "TB_QUEUE_RABBIT_MQ_PASSWORD" queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES" +service_bus: + namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME" + sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME" + sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY" + max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES" + queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES" + logger: level: "LOGGER_LEVEL" path: "LOG_FOLDER" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 551aaabdf5..3155b051dc 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -41,6 +41,8 @@ rabbitmq: password: "password" queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000" +service_bus: + queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800" logger: level: "info" diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 3b496c33e6..695b6e7faf 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -18,6 +18,8 @@ "@google-cloud/pubsub": "^1.7.1", "aws-sdk": "^2.663.0", "amqplib": "^0.5.5", + "@azure/service-bus": "^1.1.6", + "azure-sb": "^0.11.1", "long": "^4.0.0", "uuid-parse": "^1.0.0", "winston": "^3.0.0", diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index 297509ca9b..0396824af1 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -113,7 +113,7 @@ function AwsSqsProducer() { Id: message.MessageId, ReceiptHandle: message.ReceiptHandle }); - messageProcessor.onJsInvokeMessage(message.Body); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body)); }); const deleteBatch = { @@ -187,7 +187,7 @@ async function exit(status) { logger.info('Stopping Aws Sqs client.') try { await sqsClient.close(); - logger.info('Aws Sqs client is stopped.') + logger.info('Aws Sqs client stopped.') process.exit(status); } catch (e) { logger.info('Aws Sqs client stop error.'); diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index f0fde2952c..699ca7be00 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -105,7 +105,7 @@ function KafkaProducer() { msg.key = key.toString('utf8'); msg.data = [...data]; msg.headers = {data: headers} - messageProcessor.onJsInvokeMessage(JSON.stringify(msg)); + messageProcessor.onJsInvokeMessage(msg); }, }); diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js index 0db0d19f05..17e1b56e1d 100644 --- a/msa/js-executor/queue/pubSubTemplate.js +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -83,7 +83,7 @@ function PubSubProducer() { const messageHandler = message => { - messageProcessor.onJsInvokeMessage(message.data.toString('utf8')); + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); message.ack(); }; @@ -141,7 +141,7 @@ async function exit(status) { logger.info('Stopping Pub/Sub client.') try { await pubSubClient.close(); - logger.info('Pub/Sub client is stopped.') + logger.info('Pub/Sub client stopped.') process.exit(status); } catch (e) { logger.info('Pub/Sub client stop error.'); diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js index e33409fc5a..1a2905c3a0 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.js +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -109,7 +109,7 @@ function RabbitMqProducer() { }); if (message) { - messageProcessor.onJsInvokeMessage(message.content.toString('utf8')); + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); channel.ack(message); } else { await sleep(poolInterval); @@ -132,7 +132,7 @@ function parseQueueProperties() { function createQueue(topic) { return new Promise((resolve, reject) => { - channel.assertQueue(topic, queueParams, function (err, data) { + channel.assertQueue(topic, queueParams, function (err) { if (err) { reject(err); } else { @@ -158,14 +158,14 @@ async function exit(status) { if (channel) { logger.info('Stopping RabbitMq chanel.') await channel.close(); - logger.info('RabbitMq chanel is stopped'); + logger.info('RabbitMq chanel stopped'); } if (connection) { logger.info('Stopping RabbitMq connection.') try { await connection.close(); - logger.info('RabbitMq client is connection.') + logger.info('RabbitMq client connection.') process.exit(status); } catch (e) { logger.info('RabbitMq connection stop error.'); diff --git a/msa/js-executor/queue/serviceBusTemplate.js b/msa/js-executor/queue/serviceBusTemplate.js new file mode 100644 index 0000000000..034921afb7 --- /dev/null +++ b/msa/js-executor/queue/serviceBusTemplate.js @@ -0,0 +1,194 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('serviceBusTemplate'); +const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus"); +const azure = require('azure-sb'); + +const requestTopic = config.get('request_topic'); +const namespaceName = config.get('service_bus.namespace_name'); +const sasKeyName = config.get('service_bus.sas_key_name'); +const sasKey = config.get('service_bus.sas_key'); +const queueProperties = config.get('service_bus.queue-properties'); + +let sbClient; +let receiverClient; +let receiver; +let serviceBusService; + +let queueOptions = {}; +const queues = []; +const senderMap = new Map(); + +function ServiceBusProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + if (!queues.includes(requestTopic)) { + await createQueueIfNotExist(requestTopic); + queues.push(requestTopic); + } + + let customSender = senderMap.get(responseTopic); + + if (!customSender) { + customSender = new CustomSender(responseTopic); + senderMap.set(responseTopic, customSender); + } + + let data = { + key: scriptId, + data: [...rawResponse], + headers: headers + }; + + return customSender.send({body: data}); + } +} + +function CustomSender(topic) { + this.queueClient = sbClient.createQueueClient(topic); + this.sender = this.queueClient.createSender(); + + this.send = async (message) => { + return this.sender.send(message); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`; + sbClient = ServiceBusClient.createFromConnectionString(connectionString); + serviceBusService = azure.createServiceBusService(connectionString); + + parseQueueProperties(); + + await new Promise((resolve, reject) => { + serviceBusService.listQueues((err, data) => { + if (err) { + reject(err); + } else { + data.forEach(queue => { + queues.push(queue.QueueName); + }); + resolve(); + } + }); + }); + + if (!queues.includes(requestTopic)) { + await createQueueIfNotExist(requestTopic); + queues.push(requestTopic); + } + + receiverClient = sbClient.createQueueClient(requestTopic); + receiver = receiverClient.createReceiver(ReceiveMode.peekLock); + + const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer()); + + const messageHandler = async (message) => { + if (message) { + messageProcessor.onJsInvokeMessage(message.body); + await message.complete(); + } + }; + const errorHandler = (error) => { + logger.error('Failed to receive message from queue.', error); + }; + receiver.registerMessageHandler(messageHandler, errorHandler); + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +async function createQueueIfNotExist(topic) { + return new Promise((resolve, reject) => { + serviceBusService.createQueueIfNotExists(topic, queueOptions, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +function parseQueueProperties() { + let properties = {}; + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); + queueOptions = { + MaxSizeInMegabytes: properties['maxSizeInMb'], + DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, + LockDuration: `PT${properties['lockDurationInSec']}S` + }; +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + logger.info('Stopping Azure Service Bus resources...') + if (receiver) { + try { + await receiver.close(); + } catch (e) { + + } + } + + if (receiverClient) { + try { + await receiverClient.close(); + } catch (e) { + + } + } + + senderMap.forEach((k, v) => { + try { + v.sender.close(); + } catch (e) { + + } + try { + v.queueClient.close(); + } catch (e) { + + } + }); + + if (sbClient) { + try { + sbClient.close(); + } catch (e) { + + } + } + logger.info('Azure Service Bus resources stopped.') + process.exit(status); +} \ No newline at end of file diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index bd84289200..58361016c4 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -21,22 +21,27 @@ switch (serviceType) { case 'kafka': logger.info('Starting kafka template.'); require('./queue/kafkaTemplate'); - logger.info('kafka template is started.'); + logger.info('kafka template started.'); break; case 'pubsub': logger.info('Starting Pub/Sub template.') require('./queue/pubSubTemplate'); - logger.info('Pub/Sub template is started.') + logger.info('Pub/Sub template started.') break; case 'aws-sqs': logger.info('Starting Aws Sqs template.') require('./queue/awsSqsTemplate'); - logger.info('Aws Sqs template is started.') + logger.info('Aws Sqs template started.') break; case 'rabbitmq': logger.info('Starting RabbitMq template.') require('./queue/rabbitmqTemplate'); - logger.info('RabbitMq template is started.') + logger.info('RabbitMq template started.') + break; + case 'service-bus': + logger.info('Starting Azure Service Bus template.') + require('./queue/serviceBusTemplate'); + logger.info('Azure Service Bus template started.') break; default: logger.error('Unknown service type: ', serviceType); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 1807c5c4fe..e2bb07fb6f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -41,7 +41,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; name = "delay", configClazz = TbMsgDelayNodeConfiguration.class, nodeDescription = "Delays incoming message", - nodeDetails = "Delays messages for configurable period.", + nodeDetails = "Delays messages for configurable period. Please note, this node acknowledges the message from the current queue (message will be removed from queue)", icon = "pause", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeMsgDelayConfig"