diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java index c072311fe2..751bde6303 100644 --- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java +++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java index 927584789d..416a4d6b0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java @@ -49,7 +49,7 @@ public class DefaultMsgQueueService implements MsgQueueService { @Autowired private MsgQueue msgQueue; - @Autowired + @Autowired(required = false) private TenantQuotaService quotaService; private ScheduledExecutorService cleanupExecutor; @@ -74,7 +74,7 @@ public class DefaultMsgQueueService implements MsgQueueService { @Override public ListenableFuture put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) { - if(quotaService.isQuotaExceeded(tenantId.getId().toString())) { + if(quotaService != null && quotaService.isQuotaExceeded(tenantId.getId().toString())) { log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId()); return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded")); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 37a62117be..b334ff15ae 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -103,9 +103,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { responseBuilder.autoCommit(true); responseBuilder.autoCommitIntervalMs(autoCommitInterval); responseBuilder.decoder(new RemoteJsResponseDecoder()); - responseBuilder.requestIdExtractor((response) -> { - return new UUID(response.getRequestIdMSB(), response.getRequestIdLSB()); - }); + responseBuilder.requestIdExtractor((response) -> new UUID(response.getRequestIdMSB(), response.getRequestIdLSB())); TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder builder = TbKafkaRequestTemplate.builder(); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 7831b340dc..0c7b5f4327 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -1,14 +1,165 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.service.transport; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; +import org.thingsboard.server.dao.device.DeviceCredentialsService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenResponseMsg; +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; +import org.thingsboard.server.kafka.TBKafkaProducerTemplate; +import org.thingsboard.server.kafka.TbKafkaResponseTemplate; +import org.thingsboard.server.kafka.TbKafkaSettings; +import org.thingsboard.server.service.cluster.discovery.DiscoveryService; + +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Created by ashvayka on 05.10.18. */ @Slf4j @Service -@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false) +@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true") public class RemoteTransportApiService implements TransportApiService { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Value("${transport.remote.transport_api.requests_topic}") + private String transportApiRequestsTopic; + @Value("${transport.remote.transport_api.responses_topic}") + private String transportApiResponsesTopic; + @Value("${transport.remote.transport_api.max_pending_requests}") + private int maxPendingRequests; + @Value("${transport.remote.transport_api.request_timeout}") + private long requestTimeout; + @Value("${transport.remote.transport_api.request_poll_interval}") + private int responsePollDuration; + @Value("${transport.remote.transport_api.request_auto_commit_interval}") + private int autoCommitInterval; + + @Autowired + private TbKafkaSettings kafkaSettings; + + @Autowired + private DiscoveryService discoveryService; + + @Autowired + private DeviceService deviceService; + + @Autowired + private DeviceCredentialsService deviceCredentialsService; + + private ExecutorService transportCallbackExecutor; + + private TbKafkaResponseTemplate transportApiTemplate; + + @PostConstruct + public void init() { + this.transportCallbackExecutor = Executors.newCachedThreadPool(); + + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder responseBuilder = TBKafkaProducerTemplate.builder(); + responseBuilder.settings(kafkaSettings); + responseBuilder.defaultTopic(transportApiResponsesTopic); + responseBuilder.encoder(new TransportApiResponseEncoder()); + + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder requestBuilder = TBKafkaConsumerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.topic(transportApiRequestsTopic); + requestBuilder.clientId(discoveryService.getNodeId()); + requestBuilder.groupId("tb-node"); + requestBuilder.autoCommit(true); + requestBuilder.autoCommitIntervalMs(autoCommitInterval); + requestBuilder.decoder(new TransportApiRequestDecoder()); + + TbKafkaResponseTemplate.TbKafkaResponseTemplateBuilder + builder = TbKafkaResponseTemplate.builder(); + builder.requestTemplate(requestBuilder.build()); + builder.responseTemplate(responseBuilder.build()); + builder.maxPendingRequests(maxPendingRequests); + builder.requestTimeout(requestTimeout); + builder.pollInterval(responsePollDuration); + builder.executor(transportCallbackExecutor); + builder.handler(this); + transportApiTemplate = builder.build(); + transportApiTemplate.init(); + } + + @Override + public ListenableFuture handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception { + if (transportApiRequestMsg.hasValidateTokenRequestMsg()) { + ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg(); + //TODO: Make async and enable caching + DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(msg.getToken()); + if (credentials != null && credentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) { + return getDeviceInfo(credentials.getDeviceId()); + } else { + return getEmptyTransportApiResponseFuture(); + } + } + return getEmptyTransportApiResponseFuture(); + } + + private ListenableFuture getDeviceInfo(DeviceId deviceId) { + return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> { + if (device == null) { + log.trace("[{}] Failed to lookup device by id", deviceId); + return getEmptyTransportApiResponse(); + } + try { + DeviceInfoProto deviceInfoProto = DeviceInfoProto.newBuilder() + .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) + .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) + .setDeviceName(device.getName()) + .setDeviceType(device.getType()) + .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo())) + .build(); + return TransportApiResponseMsg.newBuilder() + .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build(); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to lookup device by id", deviceId, e); + return getEmptyTransportApiResponse(); + } + }); + } + + private ListenableFuture getEmptyTransportApiResponseFuture() { + return Futures.immediateFuture(getEmptyTransportApiResponse()); + } + + private TransportApiResponseMsg getEmptyTransportApiResponse() { + return TransportApiResponseMsg.newBuilder() + .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.getDefaultInstance()).build(); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java new file mode 100644 index 0000000000..c54f1a0303 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transport; + +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.kafka.TbKafkaDecoder; + +import java.io.IOException; + +/** + * Created by ashvayka on 05.10.18. + */ +public class TransportApiRequestDecoder implements TbKafkaDecoder { + @Override + public TransportApiRequestMsg decode(byte[] data) throws IOException { + return TransportApiRequestMsg.parseFrom(data); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java new file mode 100644 index 0000000000..175a92df96 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transport; + +import org.thingsboard.server.kafka.TbKafkaEncoder; + +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; + +/** + * Created by ashvayka on 05.10.18. + */ +public class TransportApiResponseEncoder implements TbKafkaEncoder { + @Override + public byte[] encode(TransportApiResponseMsg value) { + return value.toByteArray(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java index 28a769a832..cc6f5b9f15 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java @@ -1,7 +1,25 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.service.transport; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.kafka.TbKafkaHandler; + /** * Created by ashvayka on 05.10.18. */ -public interface TransportApiService { +public interface TransportApiService extends TbKafkaHandler { } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f35df67265..c58f3a1a38 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -84,7 +84,7 @@ http: # MQTT server parameters mqtt: # Enable/disable mqtt transport protocol. - enabled: "${MQTT_ENABLED:true}" + enabled: "${MQTT_ENABLED:false}" bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}" bind_port: "${MQTT_BIND_PORT:1883}" adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" @@ -149,7 +149,7 @@ quota: # Interval for scheduled task that cleans expired records. TTL is used for expiring cleanPeriodMs: "${QUOTA_TENANT_CLEAN_PERIOD_MS:300000}" # Enable Host API Limits - enabled: "${QUOTA_TENANT_ENABLED:false}" + enabled: "${QUOTA_TENANT_ENABLED:true}" # Array of whitelist tenants whitelist: "${QUOTA_TENANT_WHITELIST:}" # Array of blacklist tenants @@ -450,3 +450,16 @@ js: response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}" + +transport: + remote: + enabled: "${REMOTE_TRANSPORT_ENABLED:true}" + transport_api: + requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}" + responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" + max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}" + request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" + request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" + request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}" + rule_engine: + topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}" \ No newline at end of file diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java index 8d851bca83..386bfc574e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java new file mode 100644 index 0000000000..b8ad758ef9 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.kafka; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Created by ashvayka on 05.10.18. + */ +public class AsyncCallbackTemplate { + + public static void withCallbackAndTimeout(ListenableFuture future, + Consumer onSuccess, + Consumer onFailure, + long timeoutInMs, + ScheduledExecutorService timeoutExecutor, + Executor callbackExecutor) { + future = Futures.withTimeout(future, timeoutInMs, TimeUnit.MILLISECONDS, timeoutExecutor); + withCallback(future, onSuccess, onFailure, callbackExecutor); + } + + public static void withCallback(ListenableFuture future, Consumer onSuccess, + Consumer onFailure, Executor executor) { + FutureCallback callback = new FutureCallback() { + @Override + public void onSuccess(T result) { + try { + onSuccess.accept(result); + } catch (Throwable th) { + onFailure(th); + } + } + + @Override + public void onFailure(Throwable t) { + onFailure.accept(t); + } + }; + if (executor != null) { + Futures.addCallback(future, callback, executor); + } else { + Futures.addCallback(future, callback); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 2611e9491a..1e109d2877 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -101,6 +101,10 @@ public class TBKafkaProducerTemplate { return send(this.defaultTopic, key, value, timestamp, headers); } + public Future send(String topic, String key, T value, Iterable

headers) { + return send(topic, key, value, null, headers); + } + public Future send(String topic, String key, T value, Long timestamp, Iterable
headers) { byte[] data = encoder.encode(value); ProducerRecord record; diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java index 66d53c3bde..a64d28e173 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java @@ -1,12 +1,27 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.kafka; -import java.util.function.Consumer; +import com.google.common.util.concurrent.ListenableFuture; /** * Created by ashvayka on 05.10.18. */ public interface TbKafkaHandler { - void handle(Request request, Consumer onSuccess, Consumer onFailure); + ListenableFuture handle(Request request) throws Exception; } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java index 1f533873cd..0dbf45db14 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,13 +21,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; -import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -40,12 +42,14 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT private final TBKafkaProducerTemplate responseTemplate; private final TbKafkaHandler handler; private final ConcurrentMap pendingRequests; - private final ExecutorService executor; + private final ExecutorService loopExecutor; + private final ScheduledExecutorService timeoutExecutor; + private final ExecutorService callbackExecutor; private final int maxPendingRequests; + private final long requestTimeout; private final long pollInterval; private volatile boolean stopped = false; - //TODO: private final AtomicInteger pendingRequestCount = new AtomicInteger(); @Builder @@ -53,6 +57,7 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT TBKafkaProducerTemplate responseTemplate, TbKafkaHandler handler, long pollInterval, + long requestTimeout, int maxPendingRequests, ExecutorService executor) { this.requestTemplate = requestTemplate; @@ -61,18 +66,24 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT this.pendingRequests = new ConcurrentHashMap<>(); this.maxPendingRequests = maxPendingRequests; this.pollInterval = pollInterval; - this.executor = executor; + this.requestTimeout = requestTimeout; + this.callbackExecutor = executor; + this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + this.loopExecutor = Executors.newSingleThreadExecutor(); } public void init() { this.responseTemplate.init(); requestTemplate.subscribe(); - executor.submit(() -> { + loopExecutor.submit(() -> { while (!stopped) { - if(pendingRequestCount.get() > maxPendingRequests){ - + while (pendingRequestCount.get() >= maxPendingRequests) { + try { + Thread.sleep(pollInterval); + } catch (InterruptedException e) { + log.trace("Failed to wait until the server has capacity to handle new requests", e); + } } - //TODO: we need to protect from reading too much requests. ConsumerRecords requests = requestTemplate.poll(Duration.ofMillis(pollInterval)); requests.forEach(request -> { Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); @@ -92,12 +103,27 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT } String responseTopic = bytesToString(responseTopicHeader.value()); try { + pendingRequestCount.getAndIncrement(); Request decodedRequest = requestTemplate.decode(request); - executor.submit(() -> handler.handle(decodedRequest, - response -> reply(requestId, responseTopic, response), - e -> log.error("[{}] Failed to process the request: {}", requestId, request, e))); + AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest), + response -> { + pendingRequestCount.decrementAndGet(); + reply(requestId, responseTopic, response); + }, + e -> { + pendingRequestCount.decrementAndGet(); + if (e.getCause() != null && e.getCause() instanceof TimeoutException) { + log.warn("[{}] Timedout to process the request: {}", requestId, request, e); + } else { + log.trace("[{}] Failed to process the request: {}", requestId, request, e); + } + }, + requestTimeout, + timeoutExecutor, + callbackExecutor); } catch (Throwable e) { - log.error("[{}] Failed to process the request: {}", requestId, request, e); + pendingRequestCount.decrementAndGet(); + log.warn("[{}] Failed to process the request: {}", requestId, request, e); } }); } @@ -106,10 +132,16 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT public void stop() { stopped = true; + if (timeoutExecutor != null) { + timeoutExecutor.shutdownNow(); + } + if (loopExecutor != null) { + loopExecutor.shutdownNow(); + } } private void reply(UUID requestId, String topic, Response response) { - responseTemplate.send(topic, response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)))); + responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)))); } } diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto index f0cc3c75ed..029bb14ebf 100644 --- a/common/transport/src/main/proto/transport.proto +++ b/common/transport/src/main/proto/transport.proto @@ -47,11 +47,13 @@ message TsKvListProto { } message DeviceInfoProto { - int64 deviceIdMSB = 1; - int64 deviceIdLSB = 2; - string deviceName = 3; - string deviceType = 4; - string additionalInfo = 5; + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + string deviceName = 5; + string deviceType = 6; + string additionalInfo = 7; } /** diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 8a596d7119..0853124acb 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -157,10 +157,10 @@ public class CoapTransportResource extends CoapResource { CoapSessionCtx ctx = new CoapSessionCtx(exchange, adaptor, processor, authService, timeout); - if (!ctx.login(credentials.get())) { - exchange.respond(ResponseCode.UNAUTHORIZED); - return Optional.empty(); - } +// if (!ctx.login(credentials.get())) { +// exchange.respond(ResponseCode.UNAUTHORIZED); +// return Optional.empty(); +// } AdaptorToSessionActorMsg msg; try { @@ -190,7 +190,7 @@ public class CoapTransportResource extends CoapResource { throw new IllegalArgumentException("Unsupported msg type: " + type); } log.trace("Processing msg: {}", msg); - processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); +// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); } catch (AdaptorException e) { log.debug("Failed to decode payload {}", e); exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage()); diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java index 7a703b2be3..c43a4ded7a 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java @@ -49,7 +49,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext { private final AtomicInteger seqNumber = new AtomicInteger(2); public CoapSessionCtx(CoapExchange exchange, CoapTransportAdaptor adaptor, SessionMsgProcessor processor, DeviceAuthService authService, long timeout) { - super(processor, authService); + super(); Request request = exchange.advanced().getRequest(); this.token = request.getTokenString(); this.sessionId = new CoapSessionId(request.getSource().getHostAddress(), request.getSourcePort(), this.token); @@ -112,7 +112,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext { public void close() { log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut()); - processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId)); +// processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId)); } @Override diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index d26d076b21..add89b1e1a 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -73,20 +73,20 @@ public class DeviceApiController { if (quotaExceeded(httpRequest, responseWriter)) { return responseWriter; } - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - GetAttributesRequest request; - if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) { - request = new BasicGetAttributesRequest(0); - } else { - Set clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null; - Set sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null; - request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet); - } - process(ctx, request); - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - } +// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); +// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { +// GetAttributesRequest request; +// if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) { +// request = new BasicGetAttributesRequest(0); +// } else { +// Set clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null; +// Set sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null; +// request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet); +// } +// process(ctx, request); +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); +// } return responseWriter; } @@ -98,16 +98,16 @@ public class DeviceApiController { if (quotaExceeded(request, responseWriter)) { return responseWriter; } - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - try { - process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json))); - } catch (IllegalStateException | JsonSyntaxException ex) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - } - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - } +// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); +// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { +// try { +// process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json))); +// } catch (IllegalStateException | JsonSyntaxException ex) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); +// } +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); +// } return responseWriter; } @@ -119,15 +119,15 @@ public class DeviceApiController { return responseWriter; } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - try { - process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json))); - } catch (IllegalStateException | JsonSyntaxException ex) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - } - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - } +// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { +// try { +// process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json))); +// } catch (IllegalStateException | JsonSyntaxException ex) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); +// } +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); +// } return responseWriter; } @@ -147,17 +147,17 @@ public class DeviceApiController { if (quotaExceeded(request, responseWriter)) { return responseWriter; } - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - try { - JsonObject response = new JsonParser().parse(json).getAsJsonObject(); - process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString())); - } catch (IllegalStateException | JsonSyntaxException ex) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - } - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - } +// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); +// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { +// try { +// JsonObject response = new JsonParser().parse(json).getAsJsonObject(); +// process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString())); +// } catch (IllegalStateException | JsonSyntaxException ex) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); +// } +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); +// } return responseWriter; } @@ -169,18 +169,18 @@ public class DeviceApiController { return responseWriter; } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - try { - JsonObject request = new JsonParser().parse(json).getAsJsonObject(); - process(ctx, new ToServerRpcRequestMsg(0, - request.get("method").getAsString(), - request.get("params").toString())); - } catch (IllegalStateException | JsonSyntaxException ex) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - } - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - } +// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { +// try { +// JsonObject request = new JsonParser().parse(json).getAsJsonObject(); +// process(ctx, new ToServerRpcRequestMsg(0, +// request.get("method").getAsString(), +// request.get("params").toString())); +// } catch (IllegalStateException | JsonSyntaxException ex) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); +// } +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); +// } return responseWriter; } @@ -197,16 +197,16 @@ public class DeviceApiController { if (quotaExceeded(httpRequest, responseWriter)) { return responseWriter; } - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - try { - process(ctx, msg); - } catch (IllegalStateException | JsonSyntaxException ex) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - } - } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - } +// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); +// if (ctx.login(new DeviceTokenCredentials(deviceToken))) { +// try { +// process(ctx, msg); +// } catch (IllegalStateException | JsonSyntaxException ex) { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); +// } +// } else { +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); +// } return responseWriter; } @@ -220,7 +220,7 @@ public class DeviceApiController { private void process(HttpSessionCtx ctx, FromDeviceMsg request) { AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request); - processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); +// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); } private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java index 4732785de2..e503409b23 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java @@ -43,7 +43,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext { private final DeferredResult responseWriter; public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult responseWriter, long timeout) { - super(processor, authService); + super(); this.sessionId = new HttpSessionId(); this.responseWriter = responseWriter; this.timeout = timeout; diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index cc40ee0975..b500a31e69 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 8c64e7eeb2..a318b30e66 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -388,7 +388,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.close(); } else { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); - deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo()); + deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); transportService.process(getSessionEventMsg(SessionEvent.OPEN), null); checkGatewaySession(); } diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java deleted file mode 100644 index 493c0c8b9f..0000000000 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.thingsboard.server.mqtt.service; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - -import javax.annotation.Nullable; -import java.util.concurrent.Executor; -import java.util.function.Consumer; - -/** - * Created by ashvayka on 05.10.18. - */ -public class AsyncCallbackTemplate { - - public static void withCallback(ListenableFuture future, Consumer onSuccess, - Consumer onFailure) { - withCallback(future, onSuccess, onFailure, null); - } - - public static void withCallback(ListenableFuture future, Consumer onSuccess, - Consumer onFailure, Executor executor) { - FutureCallback callback = new FutureCallback() { - @Override - public void onSuccess(@Nullable T result) { - try { - onSuccess.accept(result); - } catch (Throwable th) { - onFailure(th); - } - } - - @Override - public void onFailure(Throwable t) { - onFailure.accept(t); - } - }; - if (executor != null) { - Futures.addCallback(future, callback, executor); - } else { - Futures.addCallback(future, callback); - } - } - -} diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java index a1641f152f..f5fd690170 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.mqtt.service; import org.springframework.beans.factory.annotation.Autowired; @@ -5,6 +20,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.kafka.AsyncCallbackTemplate; import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.kafka.TbKafkaRequestTemplate; @@ -23,19 +39,19 @@ import java.util.concurrent.Executors; @Service public class MqttTransportService implements TransportService { - @Value("${kafka.rule-engine.topic}") + @Value("${kafka.rule_engine.topic}") private String ruleEngineTopic; - @Value("${kafka.transport-api.requests-topic}") + @Value("${kafka.transport_api.requests_topic}") private String transportApiRequestsTopic; - @Value("${kafka.transport-api.responses-topic}") + @Value("${kafka.transport_api.responses_topic}") private String transportApiResponsesTopic; - @Value("${kafka.transport-api.max_pending_requests}") + @Value("${kafka.transport_api.max_pending_requests}") private long maxPendingRequests; - @Value("${kafka.transport-api.max_requests_timeout}") + @Value("${kafka.transport_api.max_requests_timeout}") private long maxRequestsTimeout; - @Value("${kafka.transport-api.response_poll_interval}") + @Value("${kafka.transport_api.response_poll_interval}") private int responsePollDuration; - @Value("${kafka.transport-api.response_auto_commit_interval}") + @Value("${kafka.transport_api.response_auto_commit_interval}") private int autoCommitInterval; @Autowired diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java index f931db6a2e..8ae4d1711f 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.mqtt.service; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java index 22e16478ae..66e64b56e6 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.mqtt.service; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml index a2a0d54adc..707fb4bcc1 100644 --- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml @@ -73,13 +73,12 @@ kafka: batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" linger.ms: "${TB_KAFKA_LINGER_MS:1}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" - transport-api: - requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}" - responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" + transport_api: + requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}" + responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}" max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" - # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted - rule-engine: + rule_engine: topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"