From cce6a44adc8a701ec0c08b7bfe91ff6f5388c138 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 29 Apr 2020 10:14:05 +0300 Subject: [PATCH] refactored queue factories --- .../provider/AwsSqsMonolithQueueFactory.java | 5 +-- .../provider/AwsSqsTbCoreQueueFactory.java | 28 +++++++++++++- .../AwsSqsTbRuleEngineQueueFactory.java | 30 ++++++++++++++- .../provider/PubSubTbCoreQueueFactory.java | 38 ++++++++++++++++--- .../PubSubTbRuleEngineQueueFactory.java | 28 +++++++++++++- .../provider/RabbitMqTbCoreQueueFactory.java | 28 +++++++++++++- .../RabbitMqTbRuleEngineQueueFactory.java | 28 +++++++++++++- 7 files changed, 170 insertions(+), 15 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index c86baf9c48..c1d724c207 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -20,7 +20,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.gen.js.JsInvokeProtos.*; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; @@ -31,8 +32,6 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; -import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index a7349091bd..4889a7182d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -30,11 +32,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -44,6 +48,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-core'") @@ -55,6 +60,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -68,6 +74,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbAwsSqsQueueAttributes sqsQueueAttributes) { this.sqsSettings = sqsSettings; this.coreSettings = coreSettings; @@ -75,6 +82,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -133,8 +141,26 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java index 3056eced2c..cf43bc5fe4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -27,11 +29,13 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; @@ -41,6 +45,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='tb-rule-engine'") @@ -51,6 +56,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbAwsSqsSettings sqsSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -61,12 +67,14 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbAwsSqsSettings sqsSettings, - TbAwsSqsQueueAttributes sqsQueueAttributes) { + TbAwsSqsQueueAttributes sqsQueueAttributes, + TbQueueRemoteJsInvokeSettings jsInvokeSettings) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.sqsSettings = sqsSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); @@ -113,8 +121,26 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings, + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index 0a23d58e46..7d85b5c3ba 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,21 +30,24 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; -import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; -import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; -import org.thingsboard.server.queue.settings.TbQueueCoreSettings; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; +import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") @@ -53,6 +58,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin jsExecutorAdmin; @@ -64,12 +70,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueTransportApiSettings transportApiSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.transportApiSettings = transportApiSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings()); @@ -127,8 +135,26 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java index 79501130ed..a6105fea0c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; import org.thingsboard.server.queue.pubsub.TbPubSubSettings; import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") @@ -52,6 +57,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory private final TbQueueRuleEngineSettings ruleEngineSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -63,12 +69,14 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbPubSubSubscriptionSettings pubSubSubscriptionSettings) { this.pubSubSettings = pubSubSettings; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings()); this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings()); @@ -116,8 +124,26 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbPubSubProducerTemplate<>(jsExecutorAdmin, pubSubSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbPubSubConsumerTemplate<>(jsExecutorAdmin, pubSubSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index d84bbedbd2..65d582c4bc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -30,6 +32,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -40,10 +43,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") @@ -55,6 +60,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueTransportApiSettings transportApiSettings; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -68,6 +74,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueRuleEngineSettings ruleEngineSettings, PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbRabbitMqQueueArguments queueArguments) { this.rabbitMqSettings = rabbitMqSettings; this.coreSettings = coreSettings; @@ -75,6 +82,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { this.ruleEngineSettings = ruleEngineSettings; this.partitionService = partitionService; this.serviceInfoProvider = serviceInfoProvider; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); @@ -133,8 +141,26 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java index a18f427fab..26abf78164 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbRuleEngineQueueFactory.java @@ -15,7 +15,9 @@ */ package org.thingsboard.server.queue.provider; +import com.google.protobuf.util.JsonFormat; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; @@ -28,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,10 +41,12 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; @Component @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") @@ -52,6 +57,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueRuleEngineSettings ruleEngineSettings; private final TbRabbitMqSettings rabbitMqSettings; + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -62,12 +68,14 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor TbQueueRuleEngineSettings ruleEngineSettings, TbServiceInfoProvider serviceInfoProvider, TbRabbitMqSettings rabbitMqSettings, + TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbRabbitMqQueueArguments queueArguments) { this.partitionService = partitionService; this.coreSettings = coreSettings; this.serviceInfoProvider = serviceInfoProvider; this.ruleEngineSettings = ruleEngineSettings; this.rabbitMqSettings = rabbitMqSettings; + this.jsInvokeSettings = jsInvokeSettings; this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); @@ -114,8 +122,26 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor } @Override + @Bean public TbQueueRequestTemplate, TbProtoQueueMsg> createRemoteJsRequestTemplate() { - return null; + TbQueueProducer> producer = new TbRabbitMqProducerTemplate<>(jsExecutorAdmin, rabbitMqSettings, jsInvokeSettings.getRequestTopic()); + TbQueueConsumer> consumer = new TbRabbitMqConsumerTemplate<>(jsExecutorAdmin, rabbitMqSettings, + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(), + msg -> { + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); + }); + + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueRequestTemplate.builder(); + builder.queueAdmin(jsExecutorAdmin); + builder.requestTemplate(producer); + builder.responseTemplate(consumer); + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout()); + builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); + return builder.build(); } @PreDestroy