diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index 229d6b4244..f108de4a43 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.azure.servicebus; import com.microsoft.azure.servicebus.management.ManagementClient; import com.microsoft.azure.servicebus.management.QueueDescription; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException; import com.microsoft.azure.servicebus.primitives.ServiceBusException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; @@ -71,7 +72,12 @@ public class TbServiceBusAdmin implements TbQueueAdmin { client.createQueue(queueDescription); queues.add(topic); } catch (ServiceBusException | InterruptedException e) { - log.error("Failed to create queue: [{}]", topic, e); + if (e instanceof MessagingEntityAlreadyExistsException) { + queues.add(topic); + log.info("[{}] queue already exists.", topic); + } else { + log.error("Failed to create queue: [{}]", topic, e); + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index 76ff04c238..c1d724c207 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -15,20 +15,25 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; @@ -40,6 +45,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='monolith'") @@ -52,6 +58,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbAwsSqsSettings sqsSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -65,7 +72,8 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbAwsSqsSettings sqsSettings, - TbAwsSqsQueueAttributes sqsQueueAttributes) { + TbAwsSqsQueueAttributes sqsQueueAttributes, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -73,6 +81,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.sqsSettings = sqsSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -144,8 +153,26 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + @Bean + public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index a7349091bd..4889a7182d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -30,11 +32,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -44,6 +48,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'") @@ -55,6 +60,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -68,6 +74,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbAwsSqsQueueAttributes sqsQueueAttributes) { this.sqsSettings = sqsSettings; this.coreSettings = coreSettings; @@ -75,6 +82,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -133,8 +141,26 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java index 3056eced2c..cf43bc5fe4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -27,11 +29,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -41,6 +45,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'") @@ -51,6 +56,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbAwsSqsSettings sqsSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -61,12 +67,14 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbAwsSqsSettings sqsSettings, - TbAwsSqsQueueAttributes sqsQueueAttributes) { + TbAwsSqsQueueAttributes sqsQueueAttributes, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.sqsSettings = sqsSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -113,8 +121,26 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index af0b276c1a..29a60af1e2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -15,10 +15,13 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; @@ -30,6 +33,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -40,12 +44,14 @@ import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'") @@ -58,6 +64,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng private final TbQueueTransportNotificationSettings transportNotificationSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -72,7 +79,8 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng TbQueueTransportNotificationSettings transportNotificationSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, - TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { + TbPubSubSubscriptionSettings pubSubSubscriptionSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; @@ -86,6 +94,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings()); this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings()); + this.jsInvokeSettings = jsInvokeSettings; } @Override @@ -152,8 +161,26 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + @Bean + public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index 0a23d58e46..7d85b5c3ba 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,21 +30,24 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; -import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; -import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") @@ -53,6 +58,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin jsExecutorAdmin; @@ -64,12 +70,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueTransportApiSettings transportApiSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.transportApiSettings = transportApiSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); @@ -127,8 +135,26 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index 79501130ed..a6105fea0c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") @@ -52,6 +57,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbQueueRuleEngineSettings ruleEngineSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -63,12 +69,14 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); @@ -116,8 +124,26 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java index 72f45e5276..1f812ea13c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java @@ -15,15 +15,19 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -34,12 +38,14 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'") @@ -58,6 +64,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin notificationAdmin; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -65,7 +72,8 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbRabbitMqSettings rabbitMqSettings, - TbRabbitMqQueueArguments queueArguments) { + TbRabbitMqQueueArguments queueArguments, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -79,6 +87,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); + this.jsInvokeSettings = jsInvokeSettings; } @Override @@ -144,8 +153,26 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + @Bean + public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index d84bbedbd2..65d582c4bc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -30,6 +32,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -40,10 +43,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") @@ -55,6 +60,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -68,6 +74,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbRabbitMqQueueArguments queueArguments) { this.rabbitMqSettings = rabbitMqSettings; this.coreSettings = coreSettings; @@ -75,6 +82,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); @@ -133,8 +141,26 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java index a18f427fab..26abf78164 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") @@ -52,6 +57,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbRabbitMqSettings rabbitMqSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -62,12 +68,14 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbRabbitMqSettings rabbitMqSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbRabbitMqQueueArguments queueArguments) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.rabbitMqSettings = rabbitMqSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); @@ -114,8 +122,26 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index 3c82e18d91..3db6496b7d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -35,19 +37,20 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") @@ -60,6 +63,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -73,6 +77,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbServiceBusSettings serviceBusSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; @@ -81,6 +86,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul this.transportApiSettings = transportApiSettings; this.transportNotificationSettings = transportNotificationSettings; this.serviceBusSettings = serviceBusSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -152,8 +158,26 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index bcb48630f9..a3ba577f06 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -34,15 +36,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") @@ -54,6 +59,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -67,6 +73,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.serviceBusSettings = serviceBusSettings; this.coreSettings = coreSettings; @@ -74,6 +81,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -132,8 +140,26 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java index 1f492596a1..e4f6dbfbb4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -32,15 +34,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") @@ -51,6 +56,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbServiceBusSettings serviceBusSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -61,12 +67,14 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbServiceBusSettings serviceBusSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbServiceBusQueueConfigs serviceBusQueueConfigs) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.serviceBusSettings = serviceBusSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); @@ -113,8 +121,26 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index f0facf8cc1..79ea505bcf 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -19,7 +19,6 @@ const COMPILATION_ERROR = 0; const RUNTIME_ERROR = 1; const TIMEOUT_ERROR = 2; const UNRECOGNIZED = -1; -let headers; const config = require('config'), logger = require('../config/logger')._logger('JsInvokeMessageProcessor'), @@ -31,6 +30,7 @@ const useSandbox = config.get('script.use_sandbox') === 'true'; const maxActiveScripts = Number(config.get('script.max_active_scripts')); function JsInvokeMessageProcessor(producer) { + console.log("Producer:", producer); this.producer = producer; this.executor = new JsExecutor(useSandbox); this.scriptMap = {}; @@ -40,24 +40,24 @@ function JsInvokeMessageProcessor(producer) { JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { - var requestId; - var responseTopic; + let requestId; + let responseTopic; try { - var request = JSON.parse(message.value.toString('utf8')); - headers = message.headers; - var buf = message.headers['requestId']; + let request = JSON.parse(Buffer.from(message.data).toString('utf8')); + let headers = message.headers; + let buf = Buffer.from(headers.data['requestId']); requestId = Utils.UUIDFromBuffer(buf); - buf = message.headers['responseTopic']; + buf = Buffer.from(headers.data['responseTopic']); responseTopic = buf.toString('utf8'); logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic); if (request.compileRequest) { - this.processCompileRequest(requestId, responseTopic, request.compileRequest); + this.processCompileRequest(requestId, responseTopic, headers, request.compileRequest); } else if (request.invokeRequest) { - this.processInvokeRequest(requestId, responseTopic, request.invokeRequest); + this.processInvokeRequest(requestId, responseTopic, headers, request.invokeRequest); } else if (request.releaseRequest) { - this.processReleaseRequest(requestId, responseTopic, request.releaseRequest); + this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest); } else { logger.error('[%s] Unknown request recevied!', requestId); } @@ -68,7 +68,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { } } -JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, compileRequest) { +JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, headers, compileRequest) { var scriptId = getScriptId(compileRequest); logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); @@ -77,17 +77,17 @@ JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, r this.cacheScript(scriptId, script); var compileResponse = createCompileResponse(scriptId, true); logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, compileResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); }, (err) => { var compileResponse = createCompileResponse(scriptId, false, COMPILATION_ERROR, err); logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, compileResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); } ); } -JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, invokeRequest) { +JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, headers, invokeRequest) { var scriptId = getScriptId(invokeRequest); logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId); this.executedScriptsCounter++; @@ -103,7 +103,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re (result) => { var invokeResponse = createInvokeResponse(result, true); logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); }, (err) => { var errorCode; @@ -114,19 +114,19 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re } var invokeResponse = createInvokeResponse("", false, errorCode, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); } ) }, (err) => { var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err); logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse); } ); } -JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, releaseRequest) { +JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, headers, releaseRequest) { var scriptId = getScriptId(releaseRequest); logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId); if (this.scriptMap[scriptId]) { @@ -138,28 +138,17 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, r } var releaseResponse = createReleaseResponse(scriptId, true); logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); - this.sendResponse(requestId, responseTopic, scriptId, null, null, releaseResponse); + this.sendResponse(requestId, responseTopic, headers, scriptId, null, null, releaseResponse); } -JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) { +JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) { var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); - this.producer.send( - { - topic: responseTopic, - messages: [ - { - key: scriptId, - value: rawResponse, - headers: headers - } - ] - } - ).then( + this.producer.send(responseTopic, scriptId, rawResponse, headers).then( () => {}, (err) => { if (err) { - logger.error('[%s] Failed to send response to kafka: %s', requestId, err.message); + logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); logger.error(err.stack); } } diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 585dfe8adb..b290719739 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -14,11 +14,45 @@ # limitations under the License. # +service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) +request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" + +js: + response_poll_interval: "REMOTE_JS_RESPONSE_POLL_INTERVAL_MS" + kafka: - request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" bootstrap: # Kafka Bootstrap Servers servers: "TB_KAFKA_SERVERS" + replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" + topic-properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" + +pubsub: + project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" + service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT" + queue-properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES" + +aws_sqs: + access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID" + secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY" + region: "TB_QUEUE_AWS_SQS_REGION" + queue-properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES" + +rabbitmq: + host: "TB_QUEUE_RABBIT_MQ_HOST" + port: "TB_QUEUE_RABBIT_MQ_PORT" + virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST" + username: "TB_QUEUE_RABBIT_MQ_USERNAME" + password: "TB_QUEUE_RABBIT_MQ_PASSWORD" + queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES" + +service_bus: + namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME" + sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME" + sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY" + max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES" + queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES" + logger: level: "LOGGER_LEVEL" path: "LOG_FOLDER" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 1290a8a429..3155b051dc 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -14,11 +14,35 @@ # limitations under the License. # +service-type: "kafka" +request_topic: "js_eval.requests" + +js: + response_poll_interval: "25" + kafka: - request_topic: "js.eval.requests" bootstrap: # Kafka Bootstrap Servers servers: "localhost:9092" + replication_factor: "1" + topic-properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" + +pubsub: + queue-properties: "ackDeadlineInSec:30;messageRetentionInSec:604800" + +aws_sqs: + queue-properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800" + +rabbitmq: + host: "localhost" + port: "5672" + virtual_host: "/" + username: "admin" + password: "password" + queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000" + +service_bus: + queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800" logger: level: "info" diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 0cdafffef1..3dadac4f84 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -14,7 +14,12 @@ "dependencies": { "config": "^3.2.2", "js-yaml": "^3.12.0", - "kafkajs": "^1.11.0", + "kafkajs": "^1.12.0", + "@google-cloud/pubsub": "^1.7.1", + "aws-sdk": "^2.663.0", + "amqplib": "^0.5.5", + "@azure/service-bus": "^1.1.6", + "azure-sb": "^0.11.1", "long": "^4.0.0", "uuid-parse": "^1.0.0", "winston": "^3.0.0", diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js new file mode 100644 index 0000000000..0396824af1 --- /dev/null +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -0,0 +1,199 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('awsSqsTemplate'); + +const requestTopic = config.get('request_topic'); + +const accessKeyId = config.get('aws_sqs.access_key_id'); +const secretAccessKey = config.get('aws_sqs.secret_access_key'); +const region = config.get('aws_sqs.region'); +const AWS = require('aws-sdk'); +const queueProperties = config.get('aws_sqs.queue-properties'); +const poolInterval = config.get('js.response_poll_interval'); + +let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; +let sqsClient; +let requestQueueURL; +const queueUrls = new Map(); +let stopped = false; + +function AwsSqsProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + let msgBody = JSON.stringify( + { + key: scriptId, + data: [...rawResponse], + headers: headers + }); + + let responseQueueUrl = queueUrls.get(topicToSqsQueueName(responseTopic)); + + if (!responseQueueUrl) { + responseQueueUrl = await createQueue(responseTopic); + queueUrls.set(responseTopic, responseQueueUrl); + } + + let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; + + return new Promise((resolve, reject) => { + sqsClient.sendMessage(params, function (err, data) { + if (err) { + reject(err); + } else { + resolve(data); + } + }); + }); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + AWS.config.update({accessKeyId: accessKeyId, secretAccessKey: secretAccessKey, region: region}); + + sqsClient = new AWS.SQS({apiVersion: '2012-11-05'}); + + const queues = await getQueues(); + + queues.forEach(queueUrl => { + const delimiterPosition = queueUrl.lastIndexOf('/'); + const queueName = queueUrl.substring(delimiterPosition + 1); + queueUrls.set(queueName, queueUrl); + }) + + parseQueueProperties(); + + requestQueueURL = queueUrls.get(topicToSqsQueueName(requestTopic)); + if (!requestQueueURL) { + requestQueueURL = await createQueue(requestTopic); + } + + const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer()); + + const params = { + MaxNumberOfMessages: 10, + QueueUrl: requestQueueURL, + WaitTimeSeconds: poolInterval / 1000 + }; + while (!stopped) { + const messages = await new Promise((resolve, reject) => { + sqsClient.receiveMessage(params, function (err, data) { + if (err) { + reject(err); + } else { + resolve(data.Messages); + } + }); + }); + + if (messages && messages.length > 0) { + const entries = []; + + messages.forEach(message => { + entries.push({ + Id: message.MessageId, + ReceiptHandle: message.ReceiptHandle + }); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body)); + }); + + const deleteBatch = { + QueueUrl: requestQueueURL, + Entries: entries + }; + sqsClient.deleteMessageBatch(deleteBatch, function (err, data) { + if (err) { + logger.error("Failed to delete messages from queue.", err.message); + } else { + //do nothing + } + }); + } + } + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +function createQueue(topic) { + let queueName = topicToSqsQueueName(topic); + let queueParams = {QueueName: queueName, Attributes: queueAttributes}; + + return new Promise((resolve, reject) => { + sqsClient.createQueue(queueParams, function (err, data) { + if (err) { + reject(err); + } else { + resolve(data.QueueUrl); + } + }); + }); +} + +function getQueues() { + return new Promise((resolve, reject) => { + sqsClient.listQueues(function (err, data) { + if (err) { + reject(err); + } else { + resolve(data.QueueUrls); + } + }); + }); +} + +function topicToSqsQueueName(topic) { + return topic.replace(/\./g, '_') + '.fifo'; +} + +function parseQueueProperties() { + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + queueAttributes[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); +} + +process.on('exit', () => { + stopped = true; + logger.info('Aws Sqs client stopped.'); + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + if (sqsClient) { + logger.info('Stopping Aws Sqs client.') + try { + await sqsClient.close(); + logger.info('Aws Sqs client stopped.') + process.exit(status); + } catch (e) { + logger.info('Aws Sqs client stop error.'); + process.exit(status); + } + } else { + process.exit(status); + } +} diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js new file mode 100644 index 0000000000..699ca7be00 --- /dev/null +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -0,0 +1,181 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +const {logLevel, Kafka} = require('kafkajs'); + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('kafkaTemplate'), + KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; +const replicationFactor = config.get('kafka.replication_factor'); +const topicProperties = config.get('kafka.topic-properties'); + +let kafkaClient; +let kafkaAdmin; +let consumer; +let producer; + +const topics = []; +const configEntries = []; + +function KafkaProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + + if (!topics.includes(responseTopic)) { + let createResponseTopicResult = await createTopic(responseTopic); + topics.push(responseTopic); + if (createResponseTopicResult) { + logger.info('Created new topic: %s', requestTopic); + } + } + + let headersData = headers.data; + headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)])); + return producer.send( + { + topic: responseTopic, + messages: [ + { + key: scriptId, + value: rawResponse, + headers: headersData + } + ] + }); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); + const requestTopic = config.get('request_topic'); + + logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + logger.info('Kafka Requests Topic: %s', requestTopic); + + kafkaClient = new Kafka({ + brokers: kafkaBootstrapServers.split(','), + logLevel: logLevel.INFO, + logCreator: KafkaJsWinstonLogCreator + }); + + parseTopicProperties(); + + kafkaAdmin = kafkaClient.admin(); + await kafkaAdmin.connect(); + + let createRequestTopicResult = await createTopic(requestTopic); + + if (createRequestTopicResult) { + logger.info('Created new topic: %s', requestTopic); + } + + consumer = kafkaClient.consumer({groupId: 'js-executor-group'}); + producer = kafkaClient.producer(); + const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); + await consumer.connect(); + await producer.connect(); + await consumer.subscribe({topic: requestTopic}); + + logger.info('Started ThingsBoard JavaScript Executor Microservice.'); + await consumer.run({ + eachMessage: async ({topic, partition, message}) => { + let headers = message.headers; + let key = message.key; + let data = message.value; + let msg = {}; + + headers = Object.fromEntries( + Object.entries(headers).map(([key, value]) => [key, [...value]])); + + msg.key = key.toString('utf8'); + msg.data = [...data]; + msg.headers = {data: headers} + messageProcessor.onJsInvokeMessage(msg); + }, + }); + + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +function createTopic(topic) { + return kafkaAdmin.createTopics({ + topics: [{ + topic: topic, + replicationFactor: replicationFactor, + configEntries: configEntries + }] + }); +} + +function parseTopicProperties() { + const props = topicProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)}); + }); +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + + if (kafkaAdmin) { + logger.info('Stopping Kafka Admin...'); + await kafkaAdmin.disconnect(); + logger.info('Kafka Admin stopped.'); + } + + if (consumer) { + logger.info('Stopping Kafka Consumer...'); + let _consumer = consumer; + consumer = null; + try { + await _consumer.disconnect(); + logger.info('Kafka Consumer stopped.'); + await disconnectProducer(); + process.exit(status); + } catch (e) { + logger.info('Kafka Consumer stop error.'); + await disconnectProducer(); + process.exit(status); + } + } else { + process.exit(status); + } +} + +async function disconnectProducer() { + if (producer) { + logger.info('Stopping Kafka Producer...'); + var _producer = producer; + producer = null; + try { + await _producer.disconnect(); + logger.info('Kafka Producer stopped.'); + } catch (e) { + logger.info('Kafka Producer stop error.'); + } + } +} diff --git a/msa/js-executor/queue/pubSubTemplate.js b/msa/js-executor/queue/pubSubTemplate.js new file mode 100644 index 0000000000..17e1b56e1d --- /dev/null +++ b/msa/js-executor/queue/pubSubTemplate.js @@ -0,0 +1,154 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('pubSubTemplate'); +const {PubSub} = require('@google-cloud/pubsub'); + +const projectId = config.get('pubsub.project_id'); +const credentials = JSON.parse(config.get('pubsub.service_account')); +const requestTopic = config.get('request_topic'); +const queueProperties = config.get('pubsub.queue-properties'); + +let pubSubClient; + +const topics = []; +const subscriptions = []; +const queueProps = []; + +function PubSubProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + + if (!(subscriptions.includes(responseTopic) && topics.includes(requestTopic))) { + await createTopic(requestTopic); + } + + let data = JSON.stringify( + { + key: scriptId, + data: [...rawResponse], + headers: headers + }); + let dataBuffer = Buffer.from(data); + return pubSubClient.topic(responseTopic).publish(dataBuffer); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + pubSubClient = new PubSub({projectId: projectId, credentials: credentials}); + + parseQueueProperties(); + + const topicList = await pubSubClient.getTopics(); + + if (topicList) { + topicList[0].forEach(topic => { + topics.push(getName(topic.name)); + }); + } + + const subscriptionList = await pubSubClient.getSubscriptions(); + + if (subscriptionList) { + topicList[0].forEach(sub => { + subscriptions.push(getName(sub.name)); + }); + } + + if (!(subscriptions.includes(requestTopic) && topics.includes(requestTopic))) { + await createTopic(requestTopic); + } + + const subscription = pubSubClient.subscription(requestTopic); + + const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer()); + + const messageHandler = message => { + + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); + message.ack(); + }; + + subscription.on('message', messageHandler); + + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +async function createTopic(topic) { + if (!topics.includes(topic)) { + await pubSubClient.createTopic(topic); + topics.push(topic); + logger.info('Created new Pub/Sub topic: %s', topic); + } + await createSubscription(topic) +} + +async function createSubscription(topic) { + if (!subscriptions.includes(topic)) { + await pubSubClient.createSubscription(topic, topic, { + topic: topic, + subscription: topic, + ackDeadlineSeconds: queueProps['ackDeadlineInSec'], + messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']} + }); + subscriptions.push(topic); + logger.info('Created new Pub/Sub subscription: %s', topic); + } +} + +function parseQueueProperties() { + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); +} + +function getName(fullName) { + const delimiterPosition = fullName.lastIndexOf('/'); + return fullName.substring(delimiterPosition + 1); +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + if (pubSubClient) { + logger.info('Stopping Pub/Sub client.') + try { + await pubSubClient.close(); + logger.info('Pub/Sub client stopped.') + process.exit(status); + } catch (e) { + logger.info('Pub/Sub client stop error.'); + process.exit(status); + } + } else { + process.exit(status); + } +} + diff --git a/msa/js-executor/queue/rabbitmqTemplate.js b/msa/js-executor/queue/rabbitmqTemplate.js new file mode 100644 index 0000000000..1a2905c3a0 --- /dev/null +++ b/msa/js-executor/queue/rabbitmqTemplate.js @@ -0,0 +1,177 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('rabbitmqTemplate'); + +const requestTopic = config.get('request_topic'); +const host = config.get('rabbitmq.host'); +const port = config.get('rabbitmq.port'); +const vhost = config.get('rabbitmq.virtual_host'); +const username = config.get('rabbitmq.username'); +const password = config.get('rabbitmq.password'); +const queueProperties = config.get('rabbitmq.queue-properties'); +const poolInterval = config.get('js.response_poll_interval'); + +const amqp = require('amqplib/callback_api'); + +let queueParams = {durable: false, exclusive: false, autoDelete: false}; +let connection; +let channel; +let stopped = false; +const responseTopics = []; + +function RabbitMqProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + + if (!responseTopics.includes(responseTopic)) { + await createQueue(responseTopic); + responseTopics.push(responseTopic); + } + + let data = JSON.stringify( + { + key: scriptId, + data: [...rawResponse], + headers: headers + }); + let dataBuffer = Buffer.from(data); + channel.sendToQueue(responseTopic, dataBuffer); + return new Promise((resolve, reject) => { + channel.waitForConfirms((err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + const url = `amqp://${host}:${port}${vhost}`; + + amqp.credentials.amqplain(username, password); + connection = await new Promise((resolve, reject) => { + amqp.connect(url, function (err, connection) { + if (err) { + reject(err); + } else { + resolve(connection); + } + }); + }); + + channel = await new Promise((resolve, reject) => { + connection.createConfirmChannel(function (err, channel) { + if (err) { + reject(err); + } else { + resolve(channel); + } + }); + }); + + parseQueueProperties(); + + await createQueue(requestTopic); + + const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); + + while (!stopped) { + let message = await new Promise((resolve, reject) => { + channel.get(requestTopic, {}, function (err, msg) { + if (err) { + reject(err); + } else { + resolve(msg); + } + }); + }); + + if (message) { + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); + channel.ack(message); + } else { + await sleep(poolInterval); + } + } + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +function parseQueueProperties() { + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); +} + +function createQueue(topic) { + return new Promise((resolve, reject) => { + channel.assertQueue(topic, queueParams, function (err) { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + + if (channel) { + logger.info('Stopping RabbitMq chanel.') + await channel.close(); + logger.info('RabbitMq chanel stopped'); + } + + if (connection) { + logger.info('Stopping RabbitMq connection.') + try { + await connection.close(); + logger.info('RabbitMq client connection.') + process.exit(status); + } catch (e) { + logger.info('RabbitMq connection stop error.'); + process.exit(status); + } + } else { + process.exit(status); + } +} diff --git a/msa/js-executor/queue/serviceBusTemplate.js b/msa/js-executor/queue/serviceBusTemplate.js new file mode 100644 index 0000000000..034921afb7 --- /dev/null +++ b/msa/js-executor/queue/serviceBusTemplate.js @@ -0,0 +1,194 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const config = require('config'), + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), + logger = require('../config/logger')._logger('serviceBusTemplate'); +const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus"); +const azure = require('azure-sb'); + +const requestTopic = config.get('request_topic'); +const namespaceName = config.get('service_bus.namespace_name'); +const sasKeyName = config.get('service_bus.sas_key_name'); +const sasKey = config.get('service_bus.sas_key'); +const queueProperties = config.get('service_bus.queue-properties'); + +let sbClient; +let receiverClient; +let receiver; +let serviceBusService; + +let queueOptions = {}; +const queues = []; +const senderMap = new Map(); + +function ServiceBusProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + if (!queues.includes(requestTopic)) { + await createQueueIfNotExist(requestTopic); + queues.push(requestTopic); + } + + let customSender = senderMap.get(responseTopic); + + if (!customSender) { + customSender = new CustomSender(responseTopic); + senderMap.set(responseTopic, customSender); + } + + let data = { + key: scriptId, + data: [...rawResponse], + headers: headers + }; + + return customSender.send({body: data}); + } +} + +function CustomSender(topic) { + this.queueClient = sbClient.createQueueClient(topic); + this.sender = this.queueClient.createSender(); + + this.send = async (message) => { + return this.sender.send(message); + } +} + +(async () => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`; + sbClient = ServiceBusClient.createFromConnectionString(connectionString); + serviceBusService = azure.createServiceBusService(connectionString); + + parseQueueProperties(); + + await new Promise((resolve, reject) => { + serviceBusService.listQueues((err, data) => { + if (err) { + reject(err); + } else { + data.forEach(queue => { + queues.push(queue.QueueName); + }); + resolve(); + } + }); + }); + + if (!queues.includes(requestTopic)) { + await createQueueIfNotExist(requestTopic); + queues.push(requestTopic); + } + + receiverClient = sbClient.createQueueClient(requestTopic); + receiver = receiverClient.createReceiver(ReceiveMode.peekLock); + + const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer()); + + const messageHandler = async (message) => { + if (message) { + messageProcessor.onJsInvokeMessage(message.body); + await message.complete(); + } + }; + const errorHandler = (error) => { + logger.error('Failed to receive message from queue.', error); + }; + receiver.registerMessageHandler(messageHandler, errorHandler); + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +async function createQueueIfNotExist(topic) { + return new Promise((resolve, reject) => { + serviceBusService.createQueueIfNotExists(topic, queueOptions, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +function parseQueueProperties() { + let properties = {}; + const props = queueProperties.split(';'); + props.forEach(p => { + const delimiterPosition = p.indexOf(':'); + properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); + }); + queueOptions = { + MaxSizeInMegabytes: properties['maxSizeInMb'], + DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, + LockDuration: `PT${properties['lockDurationInSec']}S` + }; +} + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + logger.info('Stopping Azure Service Bus resources...') + if (receiver) { + try { + await receiver.close(); + } catch (e) { + + } + } + + if (receiverClient) { + try { + await receiverClient.close(); + } catch (e) { + + } + } + + senderMap.forEach((k, v) => { + try { + v.sender.close(); + } catch (e) { + + } + try { + v.queueClient.close(); + } catch (e) { + + } + }); + + if (sbClient) { + try { + sbClient.close(); + } catch (e) { + + } + } + logger.info('Azure Service Bus resources stopped.') + process.exit(status); +} \ No newline at end of file diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index f56e5bb766..58361016c4 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -13,89 +13,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -const { logLevel, Kafka } = require('kafkajs'); -const config = require('config'), - JsInvokeMessageProcessor = require('./api/jsInvokeMessageProcessor'), - logger = require('./config/logger')._logger('main'), - KafkaJsWinstonLogCreator = require('./config/logger').KafkaJsWinstonLogCreator; - -var kafkaClient; -var consumer; -var producer; - -(async() => { - try { - logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - - const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); - const kafkaRequestTopic = config.get('kafka.request_topic'); - - logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); - - kafkaClient = new Kafka({ - brokers: kafkaBootstrapServers.split(','), - logLevel: logLevel.INFO, - logCreator: KafkaJsWinstonLogCreator - }); - - consumer = kafkaClient.consumer({ groupId: 'js-executor-group' }); - producer = kafkaClient.producer(); - const messageProcessor = new JsInvokeMessageProcessor(producer); - await consumer.connect(); - await producer.connect(); - await consumer.subscribe({ topic: kafkaRequestTopic}); - - logger.info('Started ThingsBoard JavaScript Executor Microservice.'); - await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - messageProcessor.onJsInvokeMessage(message); - }, - }); - - } catch (e) { - logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - logger.error(e.stack); - exit(-1); - } -})(); - -process.on('exit', () => { - exit(0); -}); - -async function exit(status) { - logger.info('Exiting with status: %d ...', status); - if (consumer) { - logger.info('Stopping Kafka Consumer...'); - var _consumer = consumer; - consumer = null; - try { - await _consumer.disconnect(); - logger.info('Kafka Consumer stopped.'); - await disconnectProducer(); - process.exit(status); - } catch (e) { - logger.info('Kafka Consumer stop error.'); - await disconnectProducer(); - process.exit(status); - } - } else { - process.exit(status); - } +const config = require('config'), logger = require('./config/logger')._logger('main'); + +const serviceType = config.get('service-type'); +switch (serviceType) { + case 'kafka': + logger.info('Starting kafka template.'); + require('./queue/kafkaTemplate'); + logger.info('kafka template started.'); + break; + case 'pubsub': + logger.info('Starting Pub/Sub template.') + require('./queue/pubSubTemplate'); + logger.info('Pub/Sub template started.') + break; + case 'aws-sqs': + logger.info('Starting Aws Sqs template.') + require('./queue/awsSqsTemplate'); + logger.info('Aws Sqs template started.') + break; + case 'rabbitmq': + logger.info('Starting RabbitMq template.') + require('./queue/rabbitmqTemplate'); + logger.info('RabbitMq template started.') + break; + case 'service-bus': + logger.info('Starting Azure Service Bus template.') + require('./queue/serviceBusTemplate'); + logger.info('Azure Service Bus template started.') + break; + default: + logger.error('Unknown service type: ', serviceType); + process.exit(-1); } -async function disconnectProducer() { - if (producer) { - logger.info('Stopping Kafka Producer...'); - var _producer = producer; - producer = null; - try { - await _producer.disconnect(); - logger.info('Kafka Producer stopped.'); - } catch (e) { - logger.info('Kafka Producer stop error.'); - } - } -} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 1807c5c4fe..e2bb07fb6f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -41,7 +41,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; name = "delay", configClazz = TbMsgDelayNodeConfiguration.class, nodeDescription = "Delays incoming message", - nodeDetails = "Delays messages for configurable period.", + nodeDetails = "Delays messages for configurable period. Please note, this node acknowledges the message from the current queue (message will be removed from queue)", icon = "pause", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeMsgDelayConfig"