diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index acf1a3161c..2752b53090 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -20,9 +20,9 @@ import com.datastax.driver.core.utils.UUIDs; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -292,7 +292,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .build(); sendToTransport(responseMsg, sessionInfo); } - }); + }, MoreExecutors.directExecutor()); } private ListenableFuture>> getAttributesKvEntries(GetAttributeRequestMsg request) { diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 45d773f387..d39465ff12 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -25,11 +25,10 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.common.DefaultTbQueueRequestTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.js.JsInvokeProtos; -import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; -import org.thingsboard.server.kafka.TBKafkaProducerTemplate; -import org.thingsboard.server.kafka.TbKafkaRequestTemplate; -import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbNodeIdProvider; import javax.annotation.Nullable; @@ -40,7 +39,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; @Slf4j @ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true) @@ -99,42 +97,43 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } } - private TbKafkaRequestTemplate kafkaTemplate; + private DefaultTbQueueRequestTemplate, TbProtoQueueMsg> defaultTemplate; private Map scriptIdToBodysMap = new ConcurrentHashMap<>(); @PostConstruct public void init() { - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); - requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId()); - requestBuilder.defaultTopic(requestTopic); - requestBuilder.encoder(new RemoteJsRequestEncoder()); - - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); - responseBuilder.settings(kafkaSettings); - responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId()); - responseBuilder.clientId("js-" + nodeIdProvider.getNodeId()); - responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); - responseBuilder.autoCommit(true); - responseBuilder.autoCommitIntervalMs(autoCommitInterval); - responseBuilder.decoder(new RemoteJsResponseDecoder()); - responseBuilder.requestIdExtractor((response) -> new UUID(response.getRequestIdMSB(), response.getRequestIdLSB())); - - TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder - builder = TbKafkaRequestTemplate.builder(); - builder.requestTemplate(requestBuilder.build()); - builder.responseTemplate(responseBuilder.build()); - builder.maxPendingRequests(maxPendingRequests); - builder.maxRequestTimeout(maxRequestsTimeout); - builder.pollInterval(responsePollDuration); - kafkaTemplate = builder.build(); - kafkaTemplate.init(); +// TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); +// requestBuilder.settings(kafkaSettings); +// requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId()); +// requestBuilder.defaultTopic(requestTopic); +// requestBuilder.encoder(new RemoteJsRequestEncoder()); + TbQueueProducer> producer; + +// TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); +// responseBuilder.settings(kafkaSettings); +// responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId()); +// responseBuilder.clientId("js-" + nodeIdProvider.getNodeId()); +// responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId()); +// responseBuilder.autoCommit(true); +// responseBuilder.autoCommitIntervalMs(autoCommitInterval); +// responseBuilder.decoder(new RemoteJsResponseDecoder()); +// responseBuilder.requestIdExtractor((response) -> new UUID(response.getRequestIdMSB(), response.getRequestIdLSB())); +// +// TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder +// builder = TbKafkaRequestTemplate.builder(); +// builder.requestTemplate(requestBuilder.build()); +// builder.responseTemplate(responseBuilder.build()); +// builder.maxPendingRequests(maxPendingRequests); +// builder.maxRequestTimeout(maxRequestsTimeout); +// builder.pollInterval(responsePollDuration); +// defaultTemplate = builder.build(); +// defaultTemplate.init(); } @PreDestroy public void destroy() { - if (kafkaTemplate != null) { - kafkaTemplate.stop(); + if (defaultTemplate != null) { + defaultTemplate.stop(); } } @@ -151,11 +150,12 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .build(); log.trace("Post compile request for scriptId [{}]", scriptId); - ListenableFuture future = kafkaTemplate.post(UUID.randomUUID().toString(), jsRequestWrapper); + ListenableFuture> future = defaultTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); + kafkaPushedMsgs.incrementAndGet(); - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(@Nullable JsInvokeProtos.RemoteJsResponse result) { + public void onSuccess(@Nullable TbProtoQueueMsg result) { kafkaEvalMsgs.incrementAndGet(); } @@ -168,7 +168,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } }); return Futures.transform(future, response -> { - JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); + JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); if (compilationResult.getSuccess()) { scriptIdToNameMap.put(scriptId, functionName); @@ -202,11 +202,11 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setInvokeRequest(jsRequestBuilder.build()) .build(); - ListenableFuture future = kafkaTemplate.post(UUID.randomUUID().toString(), jsRequestWrapper); + ListenableFuture> future = defaultTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); kafkaPushedMsgs.incrementAndGet(); - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback>() { @Override - public void onSuccess(@Nullable JsInvokeProtos.RemoteJsResponse result) { + public void onSuccess(@Nullable TbProtoQueueMsg result) { kafkaInvokeMsgs.incrementAndGet(); } @@ -219,7 +219,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } }); return Futures.transform(future, response -> { - JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse(); + JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse(); if (invokeResult.getSuccess()) { return invokeResult.getResult(); } else { @@ -240,8 +240,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setReleaseRequest(jsRequest) .build(); - ListenableFuture future = kafkaTemplate.post(UUID.randomUUID().toString(), jsRequestWrapper); - JsInvokeProtos.RemoteJsResponse response = future.get(); + ListenableFuture> future = defaultTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); + JsInvokeProtos.RemoteJsResponse response = future.get().getValue(); JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse(); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java index d07a2490ba..1b33fd85cc 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.script; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.kafka.TbKafkaEncoder; @@ -25,11 +26,11 @@ import java.nio.charset.StandardCharsets; /** * Created by ashvayka on 25.09.18. */ -public class RemoteJsRequestEncoder implements TbKafkaEncoder { +public class RemoteJsRequestEncoder implements TbKafkaEncoder> { @Override - public byte[] encode(JsInvokeProtos.RemoteJsRequest value) { + public byte[] encode(TbProtoQueueMsg value) { try { - return JsonFormat.printer().print(value).getBytes(StandardCharsets.UTF_8); + return JsonFormat.printer().print(value.getValue()).getBytes(StandardCharsets.UTF_8); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java index 7a3876fb91..ef15d5c877 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java @@ -16,6 +16,8 @@ package org.thingsboard.server.service.script; import com.google.protobuf.util.JsonFormat; +import org.thingsboard.server.TbQueueMsg; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.kafka.TbKafkaDecoder; @@ -25,12 +27,12 @@ import java.nio.charset.StandardCharsets; /** * Created by ashvayka on 25.09.18. */ -public class RemoteJsResponseDecoder implements TbKafkaDecoder { +public class RemoteJsResponseDecoder implements TbKafkaDecoder> { @Override - public JsInvokeProtos.RemoteJsResponse decode(byte[] data) throws IOException { + public TbProtoQueueMsg decode(TbQueueMsg msg) throws IOException { JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder(); - JsonFormat.parser().ignoringUnknownFields().merge(new String(data, StandardCharsets.UTF_8), builder); - return builder.build(); + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder); + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java index 37ff8ed93d..f6434036fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java @@ -21,10 +21,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -42,19 +41,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; -import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; -import org.thingsboard.server.kafka.TBKafkaProducerTemplate; -import org.thingsboard.server.kafka.TbKafkaResponseTemplate; -import org.thingsboard.server.kafka.TbKafkaSettings; -import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.state.DeviceStateService; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; /** @@ -84,17 +74,18 @@ public class LocalTransportApiService implements TransportApiService { private ReentrantLock deviceCreationLock = new ReentrantLock(); @Override - public ListenableFuture handle(TransportApiRequestMsg transportApiRequestMsg) { + public ListenableFuture> handle(TbProtoQueueMsg tbProtoQueueMsg) { + TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); if (transportApiRequestMsg.hasValidateTokenRequestMsg()) { ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg(); - return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN); + return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders())); } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) { ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg(); - return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE); + return Futures.transform(validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders())); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { - return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); + return Futures.transform(handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders())); } - return getEmptyTransportApiResponseFuture(); + return Futures.transform(getEmptyTransportApiResponseFuture(), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders())); } private ListenableFuture validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { @@ -145,7 +136,7 @@ public class LocalTransportApiService implements TransportApiService { try { ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder(); builder.setDeviceInfo(getDeviceInfoProto(device)); - if(!StringUtils.isEmpty(credentials.getCredentialsValue())){ + if (!StringUtils.isEmpty(credentials.getCredentialsValue())) { builder.setCredentialsBody(credentials.getCredentialsValue()); } return TransportApiResponseMsg.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java index 7aa438c64f..83353cea4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -22,9 +22,6 @@ import io.github.bucket4j.Bucket4j; import io.github.bucket4j.local.LocalBucket; import io.github.bucket4j.local.LocalBucketBuilder; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -33,15 +30,17 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.TbQueueCallback; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueMsgMetadata; +import org.thingsboard.server.TbQueueProducer; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; -import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; -import org.thingsboard.server.kafka.TBKafkaProducerTemplate; -import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; @@ -51,6 +50,7 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.Duration; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -84,9 +84,9 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ @Value("${transport.remote.rule_engine.stats.enabled:false}") private boolean statsEnabled; - @Autowired - private TbKafkaSettings kafkaSettings; - +// @Autowired +// private TbKafkaSettings kafkaSettings; +// @Autowired private TbNodeIdProvider nodeIdProvider; @@ -101,8 +101,9 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ @Autowired private DataDecodingEncodingService encodingService; - private TBKafkaConsumerTemplate ruleEngineConsumer; - private TBKafkaProducerTemplate notificationsProducer; + private TbQueueConsumer> ruleEngineConsumer; + + private TbQueueProducer> notificationsProducer; private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-main-consumer")); @@ -146,8 +147,8 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ mainConsumerExecutor.execute(() -> { while (!stopped) { try { - ConsumerRecords records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration)); - int recordsCount = records.count(); + List> msgs = ruleEngineConsumer.poll(pollDuration); + int recordsCount = msgs.size(); if (recordsCount > 0) { while (!blockingPollRateBucket.tryConsume(recordsCount, TimeUnit.SECONDS.toNanos(5))) { log.info("Rule Engine consumer is busy. Required tokens: [{}]. Available tokens: [{}].", recordsCount, pollRateBucket.getAvailableTokens()); @@ -155,9 +156,9 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ } log.trace("Processing {} records", recordsCount); } - records.forEach(record -> { + msgs.forEach(msg -> { try { - ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record); + ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); log.trace("Forwarding message to rule engine {}", toRuleEngineMsg); if (toRuleEngineMsg.hasToDeviceActorMsg()) { forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg()); @@ -196,7 +197,8 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build(); log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg); - notificationsProducer.send(topic, sessionId.toString(), transportMsg, new QueueCallbackAdaptor(onSuccess, onFailure)); + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(sessionId, transportMsg); + notificationsProducer.send(topic, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure)); } private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) { @@ -225,7 +227,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ } } - private static class QueueCallbackAdaptor implements Callback { + private static class QueueCallbackAdaptor implements TbQueueCallback { private final Runnable onSuccess; private final Consumer onFailure; @@ -235,17 +237,17 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ } @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception == null) { - if (onSuccess != null) { - onSuccess.run(); - } - } else { - if (onFailure != null) { - onFailure.accept(exception); - } + public void onSuccess(TbQueueMsgMetadata metadata) { + if (onSuccess != null) { + onSuccess.run(); } } - } + @Override + public void onFailure(Throwable t) { + if (onFailure != null) { + onFailure.accept(t); + } + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 0c9efc7d1c..e354da7daa 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -22,9 +22,16 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.TbQueueResponseTemplate; +import org.thingsboard.server.common.DefaultTbQueueRequestTemplate; +import org.thingsboard.server.common.DefaultTbQueueResponseTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; -import org.thingsboard.server.kafka.*; +import org.thingsboard.server.kafka.TbNodeIdProvider; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -49,9 +56,9 @@ public class RemoteTransportApiService { @Value("${transport.remote.transport_api.request_auto_commit_interval}") private int autoCommitInterval; - @Autowired - private TbKafkaSettings kafkaSettings; - +// @Autowired +// private TbKafkaSettings kafkaSettings; +// @Autowired private TbNodeIdProvider nodeIdProvider; @@ -60,7 +67,7 @@ public class RemoteTransportApiService { private ExecutorService transportCallbackExecutor; - private TbKafkaResponseTemplate transportApiTemplate; + private TbQueueResponseTemplate, TbProtoQueueMsg> transportApiTemplate; @PostConstruct public void init() { @@ -79,11 +86,14 @@ public class RemoteTransportApiService { requestBuilder.autoCommit(true); requestBuilder.autoCommitIntervalMs(autoCommitInterval); requestBuilder.decoder(new TransportApiRequestDecoder()); + TbQueueProducer> producer = null; + TbQueueConsumer> consumer = null; + - TbKafkaResponseTemplate.TbKafkaResponseTemplateBuilder - builder = TbKafkaResponseTemplate.builder(); - builder.requestTemplate(requestBuilder.build()); - builder.responseTemplate(responseBuilder.build()); + DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder + , TbProtoQueueMsg> builder = DefaultTbQueueResponseTemplate.builder(); + builder.requestTemplate(consumer); + builder.responseTemplate(producer); builder.maxPendingRequests(maxPendingRequests); builder.requestTimeout(requestTimeout); builder.pollInterval(responsePollDuration); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java index 9f08463f91..bdebacbd13 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.transport; +import org.thingsboard.server.TbQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.kafka.TbKafkaDecoder; @@ -24,8 +25,9 @@ import java.io.IOException; * Created by ashvayka on 05.10.18. */ public class ToRuleEngineMsgDecoder implements TbKafkaDecoder { + @Override - public ToRuleEngineMsg decode(byte[] data) throws IOException { - return ToRuleEngineMsg.parseFrom(data); + public ToRuleEngineMsg decode(TbQueueMsg msg) throws IOException { + return ToRuleEngineMsg.parseFrom(msg.getData()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java index 2964934313..7d572d08ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java @@ -15,11 +15,12 @@ */ package org.thingsboard.server.service.transport; +import org.thingsboard.server.TbQueueHandler; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.kafka.TbKafkaHandler; /** * Created by ashvayka on 05.10.18. */ -public interface TransportApiService extends TbKafkaHandler { +public interface TransportApiService extends TbQueueHandler, TbProtoQueueMsg> { } diff --git a/common/queue/src/main/java/org/thingsboard/server/TbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/TbQueueConsumer.java index 181d50bdd1..ca51eb4ddb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/TbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/TbQueueConsumer.java @@ -8,6 +8,8 @@ public interface TbQueueConsumer { void subscribe(); + void unsubscribe(); + List poll(long durationInMillis); void commit(); diff --git a/common/queue/src/main/java/org/thingsboard/server/TbQueueMsgHeaders.java b/common/queue/src/main/java/org/thingsboard/server/TbQueueMsgHeaders.java index e1a5b4861d..9dad87588d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/TbQueueMsgHeaders.java +++ b/common/queue/src/main/java/org/thingsboard/server/TbQueueMsgHeaders.java @@ -1,8 +1,12 @@ package org.thingsboard.server; +import java.util.Map; + public interface TbQueueMsgHeaders { byte[] put(String key, byte[] value); byte[] get(String key); + + Map getData(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java b/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java index 4ef33a489d..e547a6c070 100644 --- a/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java +++ b/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java @@ -7,7 +7,7 @@ import java.util.Map; public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders { - private final Map data = new HashMap<>(); + protected final Map data = new HashMap<>(); @Override public byte[] put(String key, byte[] value) { @@ -18,4 +18,9 @@ public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders { public byte[] get(String key) { return data.get(key); } + + @Override + public Map getData() { + return data; + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java index f1120b876f..c072dc4d67 100644 --- a/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java +++ b/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java @@ -11,13 +11,13 @@ public class TbProtoQueueMsg i private final UUID key; private final T value; - private final DefaultTbQueueMsgHeaders headers; + private final TbQueueMsgHeaders headers; public TbProtoQueueMsg(UUID key, T value) { this(key, value, new DefaultTbQueueMsgHeaders()); } - public TbProtoQueueMsg(UUID key, T value, DefaultTbQueueMsgHeaders headers) { + public TbProtoQueueMsg(UUID key, T value, TbQueueMsgHeaders headers) { this.key = key; this.value = value; this.headers = headers; diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/KafkaTbQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/kafka/KafkaTbQueueMsg.java new file mode 100644 index 0000000000..e44a93a70d --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/KafkaTbQueueMsg.java @@ -0,0 +1,39 @@ +package org.thingsboard.server.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.thingsboard.server.TbQueueMsg; +import org.thingsboard.server.TbQueueMsgHeaders; +import org.thingsboard.server.common.DefaultTbQueueMsgHeaders; + +import java.util.UUID; + +public class KafkaTbQueueMsg implements TbQueueMsg { + private final UUID key; + private final TbQueueMsgHeaders headers; + private final byte[] data; + + public KafkaTbQueueMsg(ConsumerRecord record) { + this.key = UUID.fromString(record.key()); + TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); + record.headers().forEach(header -> { + headers.put(header.key(), header.value()); + }); + this.headers = headers; + this.data = record.value(); + } + + @Override + public UUID getKey() { + return key; + } + + @Override + public TbQueueMsgHeaders getHeaders() { + return headers; + } + + @Override + public byte[] getData() { + return data; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/KafkaTbQueueMsgMetadata.java b/common/queue/src/main/java/org/thingsboard/server/kafka/KafkaTbQueueMsgMetadata.java new file mode 100644 index 0000000000..4698c226d6 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/KafkaTbQueueMsgMetadata.java @@ -0,0 +1,12 @@ +package org.thingsboard.server.kafka; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.thingsboard.server.TbQueueMsgMetadata; + +@Data +@AllArgsConstructor +public class KafkaTbQueueMsgMetadata implements TbQueueMsgMetadata { + private RecordMetadata metadata; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java index 0fe86e030b..9c6d911d33 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.kafka; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaFuture; +import org.thingsboard.server.TbQueueAdmin; import java.time.Duration; import java.util.Collections; @@ -30,7 +32,7 @@ import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.09.18. */ -public class TBKafkaAdmin { +public class TBKafkaAdmin implements TbQueueAdmin { AdminClient client; @@ -66,4 +68,9 @@ public class TBKafkaAdmin { } } + @Override + public ListenableFuture createTopicIfNotExists(String topic) { + + return null; + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java index 73f8cc16c7..6f0ce7b55c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java @@ -17,21 +17,27 @@ package org.thingsboard.server.kafka; import lombok.Builder; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueMsg; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.UUID; /** * Created by ashvayka on 24.09.18. */ -public class TBKafkaConsumerTemplate { +@Slf4j +public class TBKafkaConsumerTemplate implements TbQueueConsumer { private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; @@ -66,23 +72,55 @@ public class TBKafkaConsumerTemplate { this.topic = topic; } + @Override public void subscribe() { consumer.subscribe(Collections.singletonList(topic)); } + @Override + public List poll(long durationInMillis) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(durationInMillis)); + if (records.count() > 0) { + List result = new ArrayList<>(); + records.forEach(record -> { + try { + result.add(decode(record)); + } catch (IOException e) { + log.error("Failed decode record: [{}]", record); + } + }); + return result; + } + + return Collections.emptyList(); + } + + @Override + public void commit() { + consumer.commitAsync(); + } + + @Override public void unsubscribe() { consumer.unsubscribe(); } - public ConsumerRecords poll(Duration duration) { - return consumer.poll(duration); - } +// public void subscribe() { +// consumer.subscribe(Collections.singletonList(topic)); +// } +// + +// +// public ConsumerRecords poll(Duration duration) { +// return consumer.poll(duration); +// } public T decode(ConsumerRecord record) throws IOException { - return decoder.decode(record.value()); + return decoder.decode(new KafkaTbQueueMsg(record)); } public UUID extractRequestId(T value) { return requestIdExtractor.extractRequestId(value); } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 8722c1d64e..938b278207 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -15,42 +15,45 @@ */ package org.thingsboard.server.kafka; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.util.StringUtils; +import org.thingsboard.server.TbQueueCallback; +import org.thingsboard.server.TbQueueMsg; +import org.thingsboard.server.TbQueueMsgMetadata; +import org.thingsboard.server.TbQueueProducer; import java.util.List; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. */ @Slf4j -public class TBKafkaProducerTemplate { +public class TBKafkaProducerTemplate implements TbQueueProducer { private final KafkaProducer producer; - private final TbKafkaEncoder encoder; private final TbKafkaPartitioner partitioner; + private ConcurrentMap> partitionInfoMap; + @Getter private final String defaultTopic; @@ -58,8 +61,7 @@ public class TBKafkaProducerTemplate { private final TbKafkaSettings settings; @Builder - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder encoder, - TbKafkaPartitioner partitioner, String defaultTopic, String clientId) { + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaPartitioner partitioner, String defaultTopic, String clientId) { Properties props = settings.toProps(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); @@ -68,7 +70,6 @@ public class TBKafkaProducerTemplate { } this.settings = settings; this.producer = new KafkaProducer<>(props); - this.encoder = encoder; this.partitioner = partitioner; this.defaultTopic = defaultTopic; } @@ -89,43 +90,68 @@ public class TBKafkaProducerTemplate { } } - public Future send(String key, T value, Callback callback) { - return send(key, value, null, callback); - } - - public Future send(String key, T value, Iterable
headers, Callback callback) { - return send(key, value, null, headers, callback); + @Override + public ListenableFuture send(T msg, TbQueueCallback callback) { + return send(defaultTopic, msg, callback); } - public Future send(String key, T value, Long timestamp, Iterable
headers, Callback callback) { - if (!StringUtils.isEmpty(this.defaultTopic)) { - return send(this.defaultTopic, key, value, timestamp, headers, callback); - } else { - throw new RuntimeException("Failed to send message! Default topic is not specified!"); - } - } + @Override + public ListenableFuture send(String topic, T msg, TbQueueCallback callback) { + String key = msg.getKey().toString(); + byte[] data = msg.getData(); + ProducerRecord record; + Iterable
headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList()); - public Future send(String topic, String key, T value, Iterable
headers, Callback callback) { - return send(topic, key, value, null, headers, callback); - } + Integer partition = getPartition(topic, msg); + record = new ProducerRecord<>(topic, partition, key, data, headers); + Future result = producer.send(record, (metadata, exception) -> { + if (exception == null) { + callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata)); + } else { + callback.onFailure(exception); + } + }); - public Future send(String topic, String key, T value, Callback callback) { - return send(topic, key, value, null, null, callback); + return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), metadata -> new KafkaTbQueueMsgMetadata(metadata)); } - public Future send(String topic, String key, T value, Long timestamp, Iterable
headers, Callback callback) { - byte[] data = encoder.encode(value); - ProducerRecord record; - Integer partition = getPartition(topic, key, value, data); - record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers); - return producer.send(record, callback); - } +// public Future send(String key, T value, Callback callback) { +// return send(key, value, null, callback); +// } +// +// public Future send(String key, T value, Iterable
headers, Callback callback) { +// return send(key, value, null, headers, callback); +// } +// +// public Future send(String key, T value, Long timestamp, Iterable
headers, Callback callback) { +// if (!StringUtils.isEmpty(this.defaultTopic)) { +// return send(this.defaultTopic, key, value, timestamp, headers, callback); +// } else { +// throw new RuntimeException("Failed to send message! Default topic is not specified!"); +// } +// } +// +// public Future send(String topic, String key, T value, Iterable
headers, Callback callback) { +// return send(topic, key, value, null, headers, callback); +// } +// +// public Future send(String topic, String key, T value, Callback callback) { +// return send(topic, key, value, null, null, callback); +// } +// +// public Future send(String topic, String key, T value, Long timestamp, Iterable
headers, Callback callback) { +// byte[] data = encoder.encode(value); +// ProducerRecord record; +// Integer partition = getPartition(topic, key, value, data); +// record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers); +// return producer.send(record, callback); +// } - private Integer getPartition(String topic, String key, T value, byte[] data) { + private Integer getPartition(String topic, T value) { if (partitioner == null) { return null; } else { - return partitioner.partition(topic, key, value, data, partitionInfoMap.computeIfAbsent(topic, producer::partitionsFor)); + return partitioner.partition(topic, value.getKey().toString(), value, value.getData(), partitionInfoMap.computeIfAbsent(topic, producer::partitionsFor)); } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java index ab196d5863..564d905675 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.kafka; +import org.thingsboard.server.TbQueueMsg; + import java.io.IOException; /** @@ -22,6 +24,6 @@ import java.io.IOException; */ public interface TbKafkaDecoder { - T decode(byte[] data) throws IOException; + T decode(TbQueueMsg msg) throws IOException; } diff --git a/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryStorage.java new file mode 100644 index 0000000000..abf30667b5 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryStorage.java @@ -0,0 +1,45 @@ +package org.thingsboard.server.memory; + +import org.thingsboard.server.TbQueueMsg; + +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; + +public final class InMemoryStorage { + private static InMemoryStorage instance; + private final Map> storage; + + private InMemoryStorage() { + storage = new ConcurrentHashMap<>(); + } + + public static InMemoryStorage getInstance() { + if (instance == null) { + synchronized (InMemoryStorage.class) { + if (instance == null) { + instance = new InMemoryStorage(); + } + } + } + return instance; + } + + public boolean put(String topic, TbQueueMsg msg) { + return storage.computeIfAbsent(topic, (t) -> new LinkedList<>()).add(msg); + } + + public TbQueueMsg get(String topic) { + if (storage.containsKey(topic)) { + return storage.get(topic).peek(); + } + return null; + } + + public void commit(String topic) { + if (storage.containsKey(topic)) { + storage.get(topic).remove(); + } + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryTbQueueConsumer.java new file mode 100644 index 0000000000..a0a35eefb8 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryTbQueueConsumer.java @@ -0,0 +1,42 @@ +package org.thingsboard.server.memory; + +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueMsg; + +import java.util.Collections; +import java.util.List; + +public class InMemoryTbQueueConsumer implements TbQueueConsumer { + private final InMemoryStorage storage = InMemoryStorage.getInstance(); + + public InMemoryTbQueueConsumer(String topic) { + this.topic = topic; + } + + private final String topic; + + @Override + public String getTopic() { + return topic; + } + + @Override + public void subscribe() { + + } + + @Override + public void unsubscribe() { + + } + + @Override + public List poll(long durationInMillis) { + return Collections.singletonList((T)storage.get(topic)); + } + + @Override + public void commit() { + storage.commit(topic); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryTbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryTbQueueProducer.java new file mode 100644 index 0000000000..3b53ddf316 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/memory/InMemoryTbQueueProducer.java @@ -0,0 +1,43 @@ +package org.thingsboard.server.memory; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.TbQueueCallback; +import org.thingsboard.server.TbQueueMsg; +import org.thingsboard.server.TbQueueMsgMetadata; +import org.thingsboard.server.TbQueueProducer; + +public class InMemoryTbQueueProducer implements TbQueueProducer { + + private final InMemoryStorage storage = InMemoryStorage.getInstance(); + + private String defaultTopic; + + @Override + public void init() { + + } + + @Override + public String getDefaultTopic() { + return defaultTopic; + } + + @Override + public ListenableFuture send(T msg, TbQueueCallback callback) { + return send(defaultTopic, msg, callback); + } + + @Override + public ListenableFuture send(String topic, T msg, TbQueueCallback callback) { + boolean result = storage.put(topic, msg); + if (result) { + callback.onSuccess(null); + return Futures.immediateCheckedFuture(null); + } else { + Exception e = new RuntimeException("Failure add msg to InMemoryQueue"); + callback.onFailure(e); + return Futures.immediateFailedFuture(e); + } + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index dd7d9bd27a..921003dbd7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -40,7 +40,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; -import org.thingsboard.server.kafka.AsyncCallbackTemplate; +import org.thingsboard.server.common.AsyncCallbackTemplate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/ToTransportMsgResponseDecoder.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/ToTransportMsgResponseDecoder.java index c07e406d2e..31a32d8368 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/ToTransportMsgResponseDecoder.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/ToTransportMsgResponseDecoder.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.service; +import org.thingsboard.server.TbQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.kafka.TbKafkaDecoder; @@ -24,8 +25,9 @@ import java.io.IOException; * Created by ashvayka on 05.10.18. */ public class ToTransportMsgResponseDecoder implements TbKafkaDecoder { + @Override - public ToTransportMsg decode(byte[] data) throws IOException { - return ToTransportMsg.parseFrom(data); + public ToTransportMsg decode(TbQueueMsg msg) throws IOException { + return ToTransportMsg.parseFrom(msg.getData()); } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportApiResponseDecoder.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportApiResponseDecoder.java index a62e696500..89d61fba56 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportApiResponseDecoder.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportApiResponseDecoder.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.transport.service; +import org.thingsboard.server.TbQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.kafka.TbKafkaDecoder; @@ -24,8 +25,9 @@ import java.io.IOException; * Created by ashvayka on 05.10.18. */ public class TransportApiResponseDecoder implements TbKafkaDecoder { + @Override - public TransportApiResponseMsg decode(byte[] data) throws IOException { - return TransportApiResponseMsg.parseFrom(data); + public TransportApiResponseMsg decode(TbQueueMsg msg) throws IOException { + return TransportApiResponseMsg.parseFrom(msg.getData()); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java index c49fcc3728..5867e505b6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java @@ -18,12 +18,21 @@ package org.thingsboard.server.dao.entity; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.alarm.AlarmId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerService; @@ -109,7 +118,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe default: throw new IllegalStateException("Not Implemented!"); } - entityName = Futures.transform(hasName, (Function) hasName1 -> hasName1 != null ? hasName1.getName() : null ); + entityName = Futures.transform(hasName, (Function) hasName1 -> hasName1 != null ? hasName1.getName() : null, MoreExecutors.directExecutor()); return entityName; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 68b7fce6a8..22b2543489 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -16,7 +16,10 @@ package org.thingsboard.server.dao.relation; import com.google.common.base.Function; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; @@ -206,17 +209,20 @@ public class BaseRelationService implements RelationService { relations -> { List> results = deleteRelationGroupsAsync(tenantId, relations, cache, true); return Futures.allAsList(results); - }); + }, MoreExecutors.directExecutor()); ListenableFuture> outboundDeletions = Futures.transformAsync(outboundRelations, relations -> { List> results = deleteRelationGroupsAsync(tenantId, relations, cache, false); return Futures.allAsList(results); - }); + }, MoreExecutors.directExecutor()); ListenableFuture>> deletionsFuture = Futures.allAsList(inboundDeletions, outboundDeletions); - return Futures.transform(Futures.transformAsync(deletionsFuture, (deletions) -> relationDao.deleteOutboundRelationsAsync(tenantId, entityId)), result -> null); + return Futures.transform(Futures.transformAsync(deletionsFuture, + (deletions) -> relationDao.deleteOutboundRelationsAsync(tenantId, entityId), + MoreExecutors.directExecutor()), + result -> null, MoreExecutors.directExecutor()); } private List> deleteRelationGroupsAsync(TenantId tenantId, List> relations, Cache cache, boolean deleteFromDb) { @@ -306,9 +312,11 @@ public class BaseRelationService implements RelationService { public void onSuccess(@Nullable List result) { cache.putIfAbsent(fromAndTypeGroup, result); } + @Override - public void onFailure(Throwable t) {} - }); + public void onFailure(Throwable t) { + } + }, MoreExecutors.directExecutor()); return relationsFuture; } } @@ -328,7 +336,7 @@ public class BaseRelationService implements RelationService { EntityRelationInfo::setToName)) ); return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); } @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}") @@ -385,9 +393,11 @@ public class BaseRelationService implements RelationService { public void onSuccess(@Nullable List result) { cache.putIfAbsent(toAndTypeGroup, result); } + @Override - public void onFailure(Throwable t) {} - }); + public void onFailure(Throwable t) { + } + }, MoreExecutors.directExecutor()); return relationsFuture; } } @@ -407,7 +417,7 @@ public class BaseRelationService implements RelationService { EntityRelationInfo::setFromName)) ); return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); } private ListenableFuture fetchRelationInfoAsync(TenantId tenantId, EntityRelation relation, @@ -418,7 +428,7 @@ public class BaseRelationService implements RelationService { EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation); entityNameSetter.accept(entityRelationInfo1, entityName1); return entityRelationInfo1; - }); + }, MoreExecutors.directExecutor()); } @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}") @@ -466,7 +476,7 @@ public class BaseRelationService implements RelationService { } } return relations; - }); + }, MoreExecutors.directExecutor()); } catch (Exception e) { log.warn("Failed to query relations: [{}]", query, e); throw new RuntimeException(e); @@ -493,7 +503,7 @@ public class BaseRelationService implements RelationService { })) ); return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); } protected void validate(EntityRelation relation) { @@ -600,7 +610,7 @@ public class BaseRelationService implements RelationService { } //TODO: try to remove this blocking operation List> relations = Futures.successfulAsList(futures).get(); - if (fetchLastLevelOnly && lvl > 0){ + if (fetchLastLevelOnly && lvl > 0) { children.clear(); } relations.forEach(r -> r.forEach(children::add)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 176bc712e8..4ca53a337b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -143,7 +144,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } else { return Collections.emptyList(); } - }); + }, MoreExecutors.directExecutor()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 8fc8b4ab8a..b96462e350 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -330,7 +331,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem stmt.setInt(6, (int) ttl); } futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null)); - return Futures.transform(Futures.allAsList(futures), result -> null); + return Futures.transform(Futures.allAsList(futures), result -> null, MoreExecutors.directExecutor()); } private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List> futures, long partition, DataType type) { @@ -545,7 +546,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public void onFailure(Throwable t) { log.warn("[{}] Failed to process remove of the latest value", entityId, t); } - }); + }, MoreExecutors.directExecutor()); return resultFuture; }