From 7ee2cdfe3e277a89260ddfba2c3a00ab11cb0146 Mon Sep 17 00:00:00 2001 From: Yevhen Bondarenko <56396344+YevhenBondarenko@users.noreply.github.com> Date: Wed, 13 May 2020 14:08:54 +0300 Subject: [PATCH] Fixes and refactoring (#2761) * fix sqs js executor and refactored RemoteJsInvokeService * added REMOTE_JS_MAX_REQUEST_TIMEOUT=20000 for aws-sqs, pubsub, service-bus docker environments * added REMOTE_JS_MAX_REQUEST_TIMEOUT=20000 for aws-sqs, pubsub, service-bus docker environments * refactored * docker-compose.pubsub.yml improvements * rabbitmq js executor improvements --- .../service/script/RemoteJsInvokeService.java | 38 +++++++++---------- docker/docker-compose.pubsub.yml | 4 +- docker/queue-aws-sqs.env | 1 + docker/queue-pubsub.env | 3 +- docker/queue-service-bus.env | 3 +- msa/js-executor/queue/awsSqsTemplate.js | 2 +- msa/js-executor/queue/rabbitmqTemplate.js | 3 +- 7 files changed, 28 insertions(+), 26 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 663cdc978d..8d9f144c8c 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -59,22 +59,22 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @Value("${js.remote.stats.enabled:false}") private boolean statsEnabled; - private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0); - private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0); - private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0); - private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); - private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0); + private final AtomicInteger queuePushedMsgs = new AtomicInteger(0); + private final AtomicInteger queueInvokeMsgs = new AtomicInteger(0); + private final AtomicInteger queueEvalMsgs = new AtomicInteger(0); + private final AtomicInteger queueFailedMsgs = new AtomicInteger(0); + private final AtomicInteger queueTimeoutMsgs = new AtomicInteger(0); @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") public void printStats() { if (statsEnabled) { - int pushedMsgs = kafkaPushedMsgs.getAndSet(0); - int invokeMsgs = kafkaInvokeMsgs.getAndSet(0); - int evalMsgs = kafkaEvalMsgs.getAndSet(0); - int failed = kafkaFailedMsgs.getAndSet(0); - int timedOut = kafkaTimeoutMsgs.getAndSet(0); + int pushedMsgs = queuePushedMsgs.getAndSet(0); + int invokeMsgs = queueInvokeMsgs.getAndSet(0); + int evalMsgs = queueEvalMsgs.getAndSet(0); + int failed = queueFailedMsgs.getAndSet(0); + int timedOut = queueTimeoutMsgs.getAndSet(0); if (pushedMsgs > 0 || invokeMsgs > 0 || evalMsgs > 0 || failed > 0 || timedOut > 0) { - log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]", + log.info("Queue JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]", pushedMsgs, invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed, timedOut); } } @@ -116,19 +116,19 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { if (maxRequestsTimeout > 0) { future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); } - kafkaPushedMsgs.incrementAndGet(); + queuePushedMsgs.incrementAndGet(); Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(@Nullable TbProtoQueueMsg result) { - kafkaEvalMsgs.incrementAndGet(); + queueEvalMsgs.incrementAndGet(); } @Override public void onFailure(Throwable t) { if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) { - kafkaTimeoutMsgs.incrementAndGet(); + queueTimeoutMsgs.incrementAndGet(); } - kafkaFailedMsgs.incrementAndGet(); + queueFailedMsgs.incrementAndGet(); } }, MoreExecutors.directExecutor()); return Futures.transform(future, response -> { @@ -170,20 +170,20 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { if (maxRequestsTimeout > 0) { future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); } - kafkaPushedMsgs.incrementAndGet(); + queuePushedMsgs.incrementAndGet(); Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(@Nullable TbProtoQueueMsg result) { - kafkaInvokeMsgs.incrementAndGet(); + queueInvokeMsgs.incrementAndGet(); } @Override public void onFailure(Throwable t) { onScriptExecutionError(scriptId); if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) { - kafkaTimeoutMsgs.incrementAndGet(); + queueTimeoutMsgs.incrementAndGet(); } - kafkaFailedMsgs.incrementAndGet(); + queueFailedMsgs.incrementAndGet(); } }, MoreExecutors.directExecutor()); return Futures.transform(future, response -> { diff --git a/docker/docker-compose.pubsub.yml b/docker/docker-compose.pubsub.yml index 1ec178ee46..93b4047a5d 100644 --- a/docker/docker-compose.pubsub.yml +++ b/docker/docker-compose.pubsub.yml @@ -19,10 +19,10 @@ version: '2.2' services: tb-js-executor: env_file: - - queue-pubsub.env.env + - queue-pubsub.env tb-core1: env_file: - - queue-pubsub.env.env + - queue-pubsub.env depends_on: - zookeeper - redis diff --git a/docker/queue-aws-sqs.env b/docker/queue-aws-sqs.env index 1cb9fd65dd..07b9c69d90 100644 --- a/docker/queue-aws-sqs.env +++ b/docker/queue-aws-sqs.env @@ -2,3 +2,4 @@ TB_QUEUE_TYPE=aws-sqs TB_QUEUE_AWS_SQS_ACCESS_KEY_ID=YOUR_KEY TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY=YOUR_SECRET TB_QUEUE_AWS_SQS_REGION=YOUR_REGION +REMOTE_JS_MAX_REQUEST_TIMEOUT=60000 diff --git a/docker/queue-pubsub.env b/docker/queue-pubsub.env index 3bc62a1560..1a84f09cf0 100644 --- a/docker/queue-pubsub.env +++ b/docker/queue-pubsub.env @@ -1,3 +1,4 @@ TB_QUEUE_TYPE=pubsub TB_QUEUE_PUBSUB_PROJECT_ID=YOUR_PROJECT_ID -TB_QUEUE_PUBSUB_SERVICE_ACCOUNT=YOUR_SERVICE_ACCOUNT \ No newline at end of file +TB_QUEUE_PUBSUB_SERVICE_ACCOUNT=YOUR_SERVICE_ACCOUNT +REMOTE_JS_MAX_REQUEST_TIMEOUT=60000 diff --git a/docker/queue-service-bus.env b/docker/queue-service-bus.env index 07bd0dc061..24c90f660b 100644 --- a/docker/queue-service-bus.env +++ b/docker/queue-service-bus.env @@ -1,4 +1,5 @@ TB_QUEUE_TYPE=service-bus TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME=YOUR_NAMESPACE_NAME TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME=YOUR_SAS_KEY_NAME -TB_QUEUE_SERVICE_BUS_SAS_KEY=YOUR_SAS_KEY \ No newline at end of file +TB_QUEUE_SERVICE_BUS_SAS_KEY=YOUR_SAS_KEY +REMOTE_JS_MAX_REQUEST_TIMEOUT=60000 diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index a74d9d5b57..52d4997f33 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -100,7 +100,7 @@ function AwsSqsProducer() { const params = { MaxNumberOfMessages: 10, QueueUrl: requestQueueURL, - WaitTimeSeconds: poolInterval / 1000 + WaitTimeSeconds: pollInterval / 1000 }; while (!stopped) { let pollStartTs = new Date().getTime(); diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js index 2aeb5c02db..b3676a19ff 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.js +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -68,9 +68,8 @@ function RabbitMqProducer() { (async () => { try { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - const url = `amqp://${host}:${port}${vhost}`; + const url = `amqp://${username}:${password}@${host}:${port}${vhost}`; - amqp.credentials.amqplain(username, password); connection = await new Promise((resolve, reject) => { amqp.connect(url, function (err, connection) { if (err) {