diff --git a/application/pom.xml b/application/pom.xml
index 61ec42e5e3..f28c607b56 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -97,6 +97,10 @@
org.thingsboard.common
queue
+
+ org.thingsboard.common
+ stats
+
org.thingsboard
dao
@@ -309,18 +313,6 @@
Java-WebSocket
test
-
- org.springframework.boot
- spring-boot-starter-actuator
-
-
- io.micrometer
- micrometer-core
-
-
- io.micrometer
- micrometer-registry-prometheus
-
diff --git a/application/src/main/data/certs/azure/BaltimoreCyberTrustRoot.crt.pem b/application/src/main/data/certs/azure/BaltimoreCyberTrustRoot.crt.pem
new file mode 100644
index 0000000000..2bd16ebd47
--- /dev/null
+++ b/application/src/main/data/certs/azure/BaltimoreCyberTrustRoot.crt.pem
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDdzCCAl+gAwIBAgIEAgAAuTANBgkqhkiG9w0BAQUFADBaMQswCQYDVQQGEwJJ
+RTESMBAGA1UEChMJQmFsdGltb3JlMRMwEQYDVQQLEwpDeWJlclRydXN0MSIwIAYD
+VQQDExlCYWx0aW1vcmUgQ3liZXJUcnVzdCBSb290MB4XDTAwMDUxMjE4NDYwMFoX
+DTI1MDUxMjIzNTkwMFowWjELMAkGA1UEBhMCSUUxEjAQBgNVBAoTCUJhbHRpbW9y
+ZTETMBEGA1UECxMKQ3liZXJUcnVzdDEiMCAGA1UEAxMZQmFsdGltb3JlIEN5YmVy
+VHJ1c3QgUm9vdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKMEuyKr
+mD1X6CZymrV51Cni4eiVgLGw41uOKymaZN+hXe2wCQVt2yguzmKiYv60iNoS6zjr
+IZ3AQSsBUnuId9Mcj8e6uYi1agnnc+gRQKfRzMpijS3ljwumUNKoUMMo6vWrJYeK
+mpYcqWe4PwzV9/lSEy/CG9VwcPCPwBLKBsua4dnKM3p31vjsufFoREJIE9LAwqSu
+XmD+tqYF/LTdB1kC1FkYmGP1pWPgkAx9XbIGevOF6uvUA65ehD5f/xXtabz5OTZy
+dc93Uk3zyZAsuT3lySNTPx8kmCFcB5kpvcY67Oduhjprl3RjM71oGDHweI12v/ye
+jl0qhqdNkNwnGjkCAwEAAaNFMEMwHQYDVR0OBBYEFOWdWTCCR1jMrPoIVDaGezq1
+BE3wMBIGA1UdEwEB/wQIMAYBAf8CAQMwDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3
+DQEBBQUAA4IBAQCFDF2O5G9RaEIFoN27TyclhAO992T9Ldcw46QQF+vaKSm2eT92
+9hkTI7gQCvlYpNRhcL0EYWoSihfVCr3FvDB81ukMJY2GQE/szKN+OMY3EU/t3Wgx
+jkzSswF07r51XgdIGn9w/xZchMB5hbgF/X++ZRGjD8ACtPhSNzkE1akxehi/oCr0
+Epn3o0WC4zxe9Z2etciefC7IpJ5OCBRLbf1wbWsaY71k5h+3zvDyny67G7fyUIhz
+ksLi4xaNmjICq44Y3ekQEe5+NauQrz4wlHrQMz2nZQ/1/I6eYs9HRCwBXbsdtTLS
+R9I4LtD+gdwyah617jzV/OeBHRnDJELqYzmp
+-----END CERTIFICATE-----
+
diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardInstallApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardInstallApplication.java
index 86eea653ae..913c0d5498 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardInstallApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardInstallApplication.java
@@ -29,7 +29,8 @@ import java.util.Arrays;
@ComponentScan({"org.thingsboard.server.install",
"org.thingsboard.server.service.component",
"org.thingsboard.server.service.install",
- "org.thingsboard.server.dao"})
+ "org.thingsboard.server.dao",
+ "org.thingsboard.server.common.stats"})
public class ThingsboardInstallApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 4cbfff34fc..d20554357b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -131,6 +131,9 @@ class DefaultTbContext implements TbContext {
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
+ if (nodeCtx.getSelf().isDebugMode()) {
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
+ }
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
}
@@ -176,10 +179,10 @@ class DefaultTbContext implements TbContext {
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
}
- private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg tbMsg, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) {
+ private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) {
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
- tbMsg = TbMsg.newMsg(tbMsg, ruleChainId, ruleNodeId);
+ TbMsg tbMsg = TbMsg.newMsg(source, ruleChainId, ruleNodeId);
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
@@ -188,6 +191,10 @@ class DefaultTbContext implements TbContext {
if (failureMessage != null) {
msg.setFailureMessage(failureMessage);
}
+ if (nodeCtx.getSelf().isDebugMode()) {
+ relationTypes.forEach(relationType ->
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType));
+ }
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index 2716e36a46..7583ea553e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -62,6 +62,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
- consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory));
+ consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory));
}
submitExecutor = Executors.newSingleThreadExecutor();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
index 728f8b3ea1..d37801bf45 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
@@ -17,12 +17,11 @@ package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.gen.transport.TransportProtos;
-import org.thingsboard.server.service.stats.StatsCounter;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
-import org.thingsboard.server.service.stats.StatsType;
+import org.thingsboard.server.common.stats.StatsCounter;
+import org.thingsboard.server.common.stats.StatsFactory;
+import org.thingsboard.server.common.stats.StatsType;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class TbCoreConsumerStats {
@@ -53,20 +52,20 @@ public class TbCoreConsumerStats {
private final List counters = new ArrayList<>();
- public TbCoreConsumerStats(StatsCounterFactory counterFactory) {
+ public TbCoreConsumerStats(StatsFactory statsFactory) {
String statsKey = StatsType.CORE.getName();
- this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
- this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS);
- this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
- this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
- this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
- this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
- this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
- this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
- this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES);
- this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
- this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
+ this.totalCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
+ this.sessionEventCounter = statsFactory.createStatsCounter(statsKey, SESSION_EVENTS);
+ this.getAttributesCounter = statsFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
+ this.subscribeToAttributesCounter = statsFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
+ this.subscribeToRPCCounter = statsFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
+ this.toDeviceRPCCallResponseCounter = statsFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
+ this.subscriptionInfoCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
+ this.claimDeviceCounter = statsFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
+ this.deviceStateCounter = statsFactory.createStatsCounter(statsKey, DEVICE_STATES);
+ this.subscriptionMsgCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
+ this.toCoreNotificationsCounter = statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
counters.add(totalCounter);
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
index 29e4c8c2a0..41004b41f5 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
@@ -15,21 +15,19 @@
*/
package org.thingsboard.server.service.queue;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
+import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
-import org.thingsboard.server.service.stats.StatsCounter;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
-import org.thingsboard.server.service.stats.StatsType;
+import org.thingsboard.server.common.stats.StatsCounter;
+import org.thingsboard.server.common.stats.StatsType;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class TbRuleEngineConsumerStats {
@@ -60,18 +58,18 @@ public class TbRuleEngineConsumerStats {
private final String queueName;
- public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) {
+ public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) {
this.queueName = queueName;
String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
- this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
- this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
- this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
- this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS);
- this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
- this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED);
- this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
- this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
+ this.totalMsgCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
+ this.successMsgCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
+ this.timeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
+ this.failedMsgCounter = statsFactory.createStatsCounter(statsKey, FAILED_MSGS);
+ this.tmpTimeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
+ this.tmpFailedMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_FAILED);
+ this.successIterationsCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
+ this.failedIterationsCounter = statsFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
counters.add(totalMsgCounter);
counters.add(successMsgCounter);
diff --git a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
index 65766a7063..8caeddfa2d 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
@@ -52,10 +52,10 @@ public class RawAccessJwtToken implements JwtToken, Serializable {
try {
return Jwts.parser().setSigningKey(signingKey).parseClaimsJws(this.token);
} catch (UnsupportedJwtException | MalformedJwtException | IllegalArgumentException | SignatureException ex) {
- log.error("Invalid JWT Token", ex);
+ log.debug("Invalid JWT Token", ex);
throw new BadCredentialsException("Invalid JWT token: ", ex);
} catch (ExpiredJwtException expiredEx) {
- log.info("JWT Token is expired", expiredEx);
+ log.debug("JWT Token is expired", expiredEx);
throw new JwtExpiredTokenException(this, "JWT Token expired", expiredEx);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java
index 24193d8d09..d2b0e1225d 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java
+++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java
@@ -18,6 +18,9 @@ package org.thingsboard.server.service.stats;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.JsInvokeStats;
+import org.thingsboard.server.common.stats.StatsCounter;
+import org.thingsboard.server.common.stats.StatsFactory;
+import org.thingsboard.server.common.stats.StatsType;
import javax.annotation.PostConstruct;
@@ -32,14 +35,14 @@ public class DefaultJsInvokeStats implements JsInvokeStats {
private StatsCounter failuresCounter;
@Autowired
- private StatsCounterFactory counterFactory;
+ private StatsFactory statsFactory;
@PostConstruct
public void init() {
String key = StatsType.JS_INVOKE.getName();
- this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS);
- this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES);
- this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES);
+ this.requestsCounter = statsFactory.createStatsCounter(key, REQUESTS);
+ this.responsesCounter = statsFactory.createStatsCounter(key, RESPONSES);
+ this.failuresCounter = statsFactory.createStatsCounter(key, FAILURES);
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java b/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java
deleted file mode 100644
index f8276bef33..0000000000
--- a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright © 2016-2020 The Thingsboard Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.thingsboard.server.service.stats;
-
-import io.micrometer.core.instrument.Counter;
-import io.micrometer.core.instrument.MeterRegistry;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.thingsboard.server.service.metrics.StubCounter;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-@Service
-public class StatsCounterFactory {
- private static final String STATS_NAME_TAG = "statsName";
-
- private static final Counter STUB_COUNTER = new StubCounter();
-
- @Autowired
- private MeterRegistry meterRegistry;
-
- @Value("${metrics.enabled}")
- private Boolean metricsEnabled;
-
- public StatsCounter createStatsCounter(String key, String statsName) {
- return new StatsCounter(
- new AtomicInteger(0),
- metricsEnabled ?
- meterRegistry.counter(key, STATS_NAME_TAG, statsName)
- : STUB_COUNTER,
- statsName
- );
- }
-}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java
index 5b9fc91bdf..a191a3167e 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java
@@ -20,6 +20,9 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.stats.MessagesStats;
+import org.thingsboard.server.common.stats.StatsFactory;
+import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
@@ -29,10 +32,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
-import org.thingsboard.server.service.stats.DefaultQueueStats;
-import org.thingsboard.server.service.stats.StatsCounter;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
-import org.thingsboard.server.service.stats.StatsType;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -45,13 +44,9 @@ import java.util.concurrent.*;
@Service
@TbCoreComponent
public class TbCoreTransportApiService {
- private static final String TOTAL_MSGS = "totalMsgs";
- private static final String SUCCESSFUL_MSGS = "successfulMsgs";
- private static final String FAILED_MSGS = "failedMsgs";
-
private final TbCoreQueueFactory tbCoreQueueFactory;
private final TransportApiService transportApiService;
- private final StatsCounterFactory counterFactory;
+ private final StatsFactory statsFactory;
@Value("${queue.transport_api.max_pending_requests:10000}")
private int maxPendingRequests;
@@ -66,10 +61,10 @@ public class TbCoreTransportApiService {
private TbQueueResponseTemplate,
TbProtoQueueMsg> transportApiTemplate;
- public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) {
+ public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsFactory statsFactory) {
this.tbCoreQueueFactory = tbCoreQueueFactory;
this.transportApiService = transportApiService;
- this.counterFactory = counterFactory;
+ this.statsFactory = statsFactory;
}
@PostConstruct
@@ -79,10 +74,7 @@ public class TbCoreTransportApiService {
TbQueueConsumer> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();
String key = StatsType.TRANSPORT.getName();
- StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS);
- StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS);
- StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS);
- DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter);
+ MessagesStats queueStats = statsFactory.createMessagesStats(key);
DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
, TbProtoQueueMsg> builder = DefaultTbQueueResponseTemplate.builder();
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 91ad6f7d6d..57906eed51 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -178,6 +178,8 @@ database:
ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by single API call to fetch telemetry records
ts:
type: "${DATABASE_TS_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale)
+ latest_ts:
+ type: "${DATABASE_TS_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale)
# note: timescale works only with postgreSQL database for DATABASE_ENTITIES_TYPE.
@@ -756,7 +758,7 @@ queue:
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
- poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
+ poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
service:
type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine
diff --git a/common/pom.xml b/common/pom.xml
index c382903c34..ae76d4d603 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -41,6 +41,7 @@
queue
transport
dao-api
+ stats
diff --git a/common/queue/pom.xml b/common/queue/pom.xml
index 2ab2c43709..b378ad72f5 100644
--- a/common/queue/pom.xml
+++ b/common/queue/pom.xml
@@ -48,6 +48,10 @@
org.thingsboard.common
message
+
+ org.thingsboard.common
+ stats
+
org.apache.kafka
kafka-clients
@@ -112,6 +116,7 @@
org.apache.curator
curator-recipes
+
junit
junit
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java
index 20530360d6..13ef723de2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.queue;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.stats.MessagesStats;
public interface TbQueueRequestTemplate {
@@ -25,4 +26,5 @@ public interface TbQueueRequestTemplate requestTemplate,
@@ -153,6 +156,11 @@ public class DefaultTbQueueRequestTemplate send(Request request) {
if (tickSize > maxPendingRequests) {
@@ -166,14 +174,23 @@ public class DefaultTbQueueRequestTemplate responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
+ if (messagesStats != null) {
+ messagesStats.incrementTotal();
+ }
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
+ if (messagesStats != null) {
+ messagesStats.incrementSuccessful();
+ }
log.trace("[{}] Request sent: {}", requestId, metadata);
}
@Override
public void onFailure(Throwable t) {
+ if (messagesStats != null) {
+ messagesStats.incrementFailed();
+ }
pendingRequests.remove(requestId);
future.setException(t);
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
index 602a180170..741b9c0971 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
@@ -23,7 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
-import org.thingsboard.server.queue.stats.QueueStats;
+import org.thingsboard.server.common.stats.MessagesStats;
import java.util.List;
import java.util.UUID;
@@ -45,7 +45,7 @@ public class DefaultTbQueueResponseTemplate();
diff --git a/common/stats/pom.xml b/common/stats/pom.xml
new file mode 100644
index 0000000000..42ecacec00
--- /dev/null
+++ b/common/stats/pom.xml
@@ -0,0 +1,92 @@
+
+
+
+ 4.0.0
+
+ org.thingsboard
+ 3.1.0-SNAPSHOT
+ common
+
+ org.thingsboard.common
+ stats
+ jar
+
+ Thingsboard Server Stats
+ https://thingsboard.io
+
+
+ UTF-8
+ ${basedir}/../..
+
+
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+ ch.qos.logback
+ logback-core
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ io.micrometer
+ micrometer-core
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultCounter.java
similarity index 81%
rename from application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java
rename to common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultCounter.java
index 17f27e9d46..9934e4a46f 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultCounter.java
@@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.stats;
import io.micrometer.core.instrument.Counter;
import java.util.concurrent.atomic.AtomicInteger;
-public class StatsCounter {
+public class DefaultCounter {
private final AtomicInteger aiCounter;
private final Counter micrometerCounter;
- private final String name;
- public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
+ public DefaultCounter(AtomicInteger aiCounter, Counter micrometerCounter) {
this.aiCounter = aiCounter;
this.micrometerCounter = micrometerCounter;
- this.name = name;
}
public void increment() {
@@ -47,8 +45,4 @@ public class StatsCounter {
aiCounter.addAndGet(delta);
micrometerCounter.increment(delta);
}
-
- public String getName() {
- return name;
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultMessagesStats.java
similarity index 65%
rename from application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java
rename to common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultMessagesStats.java
index d6c42a82b5..aaba689aec 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultMessagesStats.java
@@ -13,16 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.stats;
-import org.thingsboard.server.queue.stats.QueueStats;
-
-public class DefaultQueueStats implements QueueStats {
+public class DefaultMessagesStats implements MessagesStats {
private final StatsCounter totalCounter;
private final StatsCounter successfulCounter;
private final StatsCounter failedCounter;
- public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
+ public DefaultMessagesStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
this.totalCounter = totalCounter;
this.successfulCounter = successfulCounter;
this.failedCounter = failedCounter;
@@ -42,4 +40,26 @@ public class DefaultQueueStats implements QueueStats {
public void incrementFailed(int amount) {
failedCounter.add(amount);
}
+
+ @Override
+ public int getTotal() {
+ return totalCounter.get();
+ }
+
+ @Override
+ public int getSuccessful() {
+ return successfulCounter.get();
+ }
+
+ @Override
+ public int getFailed() {
+ return failedCounter.get();
+ }
+
+ @Override
+ public void reset() {
+ totalCounter.clear();
+ successfulCounter.clear();
+ failedCounter.clear();
+ }
}
diff --git a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java
new file mode 100644
index 0000000000..8ee75c859e
--- /dev/null
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.stats;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tags;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Service
+public class DefaultStatsFactory implements StatsFactory {
+ private static final String TOTAL_MSGS = "totalMsgs";
+ private static final String SUCCESSFUL_MSGS = "successfulMsgs";
+ private static final String FAILED_MSGS = "failedMsgs";
+
+ private static final String STATS_NAME_TAG = "statsName";
+
+ private static final Counter STUB_COUNTER = new StubCounter();
+
+ @Autowired
+ private MeterRegistry meterRegistry;
+
+ @Value("${metrics.enabled:false}")
+ private Boolean metricsEnabled;
+
+ @Override
+ public StatsCounter createStatsCounter(String key, String statsName) {
+ return new StatsCounter(
+ new AtomicInteger(0),
+ metricsEnabled ?
+ meterRegistry.counter(key, STATS_NAME_TAG, statsName)
+ : STUB_COUNTER,
+ statsName
+ );
+ }
+
+ @Override
+ public DefaultCounter createDefaultCounter(String key, String... tags) {
+ return new DefaultCounter(
+ new AtomicInteger(0),
+ metricsEnabled ?
+ meterRegistry.counter(key, tags)
+ : STUB_COUNTER
+ );
+ }
+
+ @Override
+ public T createGauge(String key, T number, String... tags) {
+ return meterRegistry.gauge(key, Tags.of(tags), number);
+ }
+
+ @Override
+ public MessagesStats createMessagesStats(String key) {
+ StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS);
+ StatsCounter successfulCounter = createStatsCounter(key, SUCCESSFUL_MSGS);
+ StatsCounter failedCounter = createStatsCounter(key, FAILED_MSGS);
+ return new DefaultMessagesStats(totalCounter, successfulCounter, failedCounter);
+ }
+
+ private static class StubCounter implements Counter {
+ @Override
+ public void increment(double amount) {}
+
+ @Override
+ public double count() {
+ return 0;
+ }
+
+ @Override
+ public Id getId() {
+ return null;
+ }
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/MessagesStats.java
similarity index 85%
rename from common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java
rename to common/stats/src/main/java/org/thingsboard/server/common/stats/MessagesStats.java
index d5ea3acdf1..f986445566 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/MessagesStats.java
@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.queue.stats;
+package org.thingsboard.server.common.stats;
-public interface QueueStats {
+public interface MessagesStats {
default void incrementTotal() {
incrementTotal(1);
}
@@ -28,10 +28,17 @@ public interface QueueStats {
void incrementSuccessful(int amount);
-
default void incrementFailed() {
incrementFailed(1);
}
void incrementFailed(int amount);
+
+ int getTotal();
+
+ int getSuccessful();
+
+ int getFailed();
+
+ void reset();
}
diff --git a/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsCounter.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsCounter.java
new file mode 100644
index 0000000000..221ca7afbb
--- /dev/null
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsCounter.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.stats;
+
+import io.micrometer.core.instrument.Counter;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StatsCounter extends DefaultCounter {
+ private final String name;
+
+ public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
+ super(aiCounter, micrometerCounter);
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsFactory.java
similarity index 64%
rename from application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java
rename to common/stats/src/main/java/org/thingsboard/server/common/stats/StatsFactory.java
index f7a3117554..b979d61f35 100644
--- a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsFactory.java
@@ -13,21 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.metrics;
+package org.thingsboard.server.common.stats;
-import io.micrometer.core.instrument.Counter;
+public interface StatsFactory {
+ StatsCounter createStatsCounter(String key, String statsName);
-public class StubCounter implements Counter {
- @Override
- public void increment(double amount) {}
+ DefaultCounter createDefaultCounter(String key, String... tags);
- @Override
- public double count() {
- return 0;
- }
+ T createGauge(String key, T number, String... tags);
- @Override
- public Id getId() {
- return null;
- }
+ MessagesStats createMessagesStats(String key);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsType.java
similarity index 89%
rename from application/src/main/java/org/thingsboard/server/service/stats/StatsType.java
rename to common/stats/src/main/java/org/thingsboard/server/common/stats/StatsType.java
index a831498993..430f63ecf7 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java
+++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/StatsType.java
@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.stats;
public enum StatsType {
- RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke");
+ RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke"), RATE_EXECUTOR("rateExecutor");
private String name;
diff --git a/common/transport/transport-api/pom.xml b/common/transport/transport-api/pom.xml
index 8b8006687e..1c8946160e 100644
--- a/common/transport/transport-api/pom.xml
+++ b/common/transport/transport-api/pom.xml
@@ -40,6 +40,10 @@
org.thingsboard.common
queue
+
+ org.thingsboard.common
+ stats
+
org.thingsboard.common
data
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
index 14f01a6d63..2ca27b9dfe 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
@@ -55,6 +55,9 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
+import org.thingsboard.server.common.stats.MessagesStats;
+import org.thingsboard.server.common.stats.StatsFactory;
+import org.thingsboard.server.common.stats.StatsType;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -101,12 +104,17 @@ public class DefaultTransportService implements TransportService {
private final TbQueueProducerProvider producerProvider;
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
+ private final StatsFactory statsFactory;
protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate;
protected TbQueueProducer> ruleEngineMsgProducer;
protected TbQueueProducer> tbCoreMsgProducer;
protected TbQueueConsumer> transportNotificationsConsumer;
+ protected MessagesStats ruleEngineProducerStats;
+ protected MessagesStats tbCoreProducerStats;
+ protected MessagesStats transportApiStats;
+
protected ScheduledExecutorService schedulerExecutor;
protected ExecutorService transportCallbackExecutor;
@@ -119,11 +127,12 @@ public class DefaultTransportService implements TransportService {
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
private volatile boolean stopped = false;
- public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) {
+ public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) {
this.serviceInfoProvider = serviceInfoProvider;
this.queueProvider = queueProvider;
this.producerProvider = producerProvider;
this.partitionService = partitionService;
+ this.statsFactory = statsFactory;
}
@PostConstruct
@@ -133,10 +142,14 @@ public class DefaultTransportService implements TransportService {
new TbRateLimits(perTenantLimitsConf);
new TbRateLimits(perDevicesLimitsConf);
}
+ this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
+ this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
+ this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
+ transportApiRequestTemplate.setMessagesStats(transportApiStats);
ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
@@ -557,10 +570,14 @@ public class DefaultTransportService implements TransportService {
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);
}
+ TransportTbQueueCallback transportTbQueueCallback = callback != null ?
+ new TransportTbQueueCallback(callback) : null;
+ tbCoreProducerStats.incrementTotal();
+ StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
tbCoreMsgProducer.send(tpi,
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
- ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
- new TransportTbQueueCallback(callback) : null);
+ ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),
+ wrappedCallback);
}
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
@@ -571,7 +588,9 @@ public class DefaultTransportService implements TransportService {
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
- ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
+ ruleEngineProducerStats.incrementTotal();
+ StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
+ ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
}
private class TransportTbQueueCallback implements TbQueueCallback {
@@ -592,6 +611,30 @@ public class DefaultTransportService implements TransportService {
}
}
+ private class StatsCallback implements TbQueueCallback {
+ private final TbQueueCallback callback;
+ private final MessagesStats stats;
+
+ private StatsCallback(TbQueueCallback callback, MessagesStats stats) {
+ this.callback = callback;
+ this.stats = stats;
+ }
+
+ @Override
+ public void onSuccess(TbQueueMsgMetadata metadata) {
+ stats.incrementSuccessful();
+ if (callback != null)
+ callback.onSuccess(metadata);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ stats.incrementFailed();
+ if (callback != null)
+ callback.onFailure(t);
+ }
+ }
+
private class MsgPackCallback implements TbQueueCallback {
private final AtomicInteger msgCount;
private final TransportServiceCallback callback;
diff --git a/common/util/src/main/java/org/thingsboard/common/util/AzureIotHubUtil.java b/common/util/src/main/java/org/thingsboard/common/util/AzureIotHubUtil.java
new file mode 100644
index 0000000000..4927c4ce53
--- /dev/null
+++ b/common/util/src/main/java/org/thingsboard/common/util/AzureIotHubUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Base64;
+
+@Slf4j
+public final class AzureIotHubUtil {
+ private static final String BASE_DIR_PATH = System.getProperty("user.dir");
+ private static final String APP_DIR = "application";
+ private static final String SRC_DIR = "src";
+ private static final String MAIN_DIR = "main";
+ private static final String DATA_DIR = "data";
+ private static final String CERTS_DIR = "certs";
+ private static final String AZURE_DIR = "azure";
+ private static final String FILE_NAME = "BaltimoreCyberTrustRoot.crt.pem";
+
+ private static final Path FULL_FILE_PATH;
+
+ static {
+ if (BASE_DIR_PATH.endsWith("bin")) {
+ FULL_FILE_PATH = Paths.get(BASE_DIR_PATH.replaceAll("bin$", ""), DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
+ } else if (BASE_DIR_PATH.endsWith("conf")) {
+ FULL_FILE_PATH = Paths.get(BASE_DIR_PATH.replaceAll("conf$", ""), DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
+ } else {
+ FULL_FILE_PATH = Paths.get(BASE_DIR_PATH, APP_DIR, SRC_DIR, MAIN_DIR, DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
+ }
+ }
+
+ private static final long SAS_TOKEN_VALID_SECS = 365 * 24 * 60 * 60;
+ private static final long ONE_SECOND_IN_MILLISECONDS = 1000;
+
+ private static final String SAS_TOKEN_FORMAT = "SharedAccessSignature sr=%s&sig=%s&se=%s";
+
+ private static final String USERNAME_FORMAT = "%s/%s/?api-version=2018-06-30";
+
+ private AzureIotHubUtil() {
+ }
+
+ public static String buildUsername(String host, String deviceId) {
+ return String.format(USERNAME_FORMAT, host, deviceId);
+ }
+
+ public static String buildSasToken(String host, String sasKey) {
+ try {
+ final String targetUri = URLEncoder.encode(host.toLowerCase(), "UTF-8");
+ final long expiryTime = buildExpiresOn();
+ String toSign = targetUri + "\n" + expiryTime;
+ byte[] keyBytes = Base64.getDecoder().decode(sasKey.getBytes(StandardCharsets.UTF_8));
+ SecretKeySpec signingKey = new SecretKeySpec(keyBytes, "HmacSHA256");
+ Mac mac = Mac.getInstance("HmacSHA256");
+ mac.init(signingKey);
+ byte[] rawHmac = mac.doFinal(toSign.getBytes(StandardCharsets.UTF_8));
+ String signature = URLEncoder.encode(Base64.getEncoder().encodeToString(rawHmac), "UTF-8");
+ return String.format(SAS_TOKEN_FORMAT, targetUri, signature, expiryTime);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build SAS token!!!", e);
+ }
+ }
+
+ private static long buildExpiresOn() {
+ long expiresOnDate = System.currentTimeMillis();
+ expiresOnDate += SAS_TOKEN_VALID_SECS * ONE_SECOND_IN_MILLISECONDS;
+ return expiresOnDate / ONE_SECOND_IN_MILLISECONDS;
+ }
+
+ public static String getDefaultCaCert() {
+ try {
+ return new String(Files.readAllBytes(FULL_FILE_PATH));
+ } catch (IOException e) {
+ log.error("Failed to load Default CaCert file!!! [{}]", FULL_FILE_PATH.toString());
+ throw new RuntimeException("Failed to load Default CaCert file!!!");
+ }
+ }
+
+}
diff --git a/dao/pom.xml b/dao/pom.xml
index 36b31283c7..da5f1c10c8 100644
--- a/dao/pom.xml
+++ b/dao/pom.xml
@@ -43,6 +43,10 @@
org.thingsboard.common
message
+
+ org.thingsboard.common
+ stats
+
org.thingsboard.common
dao-api
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
index f37fd5f8ec..8bdac7a777 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
@@ -23,6 +23,9 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.stats.DefaultCounter;
+import org.thingsboard.server.common.stats.StatsCounter;
+import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
import org.thingsboard.server.dao.util.AsyncTaskContext;
@@ -56,48 +59,58 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
- @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq) {
- super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq);
+ @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
+ @Autowired StatsFactory statsFactory) {
+ super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory);
this.printTenantNames = printTenantNames;
}
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
public void printStats() {
int queueSize = getQueueSize();
- int totalAddedValue = totalAdded.getAndSet(0);
- int totalLaunchedValue = totalLaunched.getAndSet(0);
- int totalReleasedValue = totalReleased.getAndSet(0);
- int totalFailedValue = totalFailed.getAndSet(0);
- int totalExpiredValue = totalExpired.getAndSet(0);
- int totalRejectedValue = totalRejected.getAndSet(0);
- int totalRateLimitedValue = totalRateLimited.getAndSet(0);
- int rateLimitedTenantsValue = rateLimitedTenants.size();
- int concurrencyLevelValue = concurrencyLevel.get();
- if (queueSize > 0 || totalAddedValue > 0 || totalLaunchedValue > 0 || totalReleasedValue > 0 ||
- totalFailedValue > 0 || totalExpiredValue > 0 || totalRejectedValue > 0 || totalRateLimitedValue > 0 || rateLimitedTenantsValue > 0
- || concurrencyLevelValue > 0) {
- log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] " +
- "totalRateLimited [{}] totalRateLimitedTenants [{}] currBuffer [{}] ",
- queueSize, totalAddedValue, totalLaunchedValue, totalReleasedValue,
- totalFailedValue, totalExpiredValue, totalRejectedValue, totalRateLimitedValue, rateLimitedTenantsValue, concurrencyLevelValue);
+ int rateLimitedTenantsCount = (int) stats.getRateLimitedTenants().values().stream()
+ .filter(defaultCounter -> defaultCounter.get() > 0)
+ .count();
+
+ if (queueSize > 0
+ || rateLimitedTenantsCount > 0
+ || concurrencyLevel.get() > 0
+ || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
+ ) {
+ StringBuilder statsBuilder = new StringBuilder();
+
+ statsBuilder.append("queueSize").append(" = [").append(queueSize).append("] ");
+ stats.getStatsCounters().forEach(counter -> {
+ statsBuilder.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
+ });
+ statsBuilder.append("totalRateLimitedTenants").append(" = [").append(rateLimitedTenantsCount).append("] ");
+ statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
+
+ stats.getStatsCounters().forEach(StatsCounter::clear);
+ log.info("Permits {}", statsBuilder);
}
- rateLimitedTenants.forEach(((tenantId, counter) -> {
- if (printTenantNames) {
- String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
- try {
- return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
- } catch (Exception e) {
- log.error("[{}] Failed to get tenant name", tenantId, e);
- return "N/A";
+ stats.getRateLimitedTenants().entrySet().stream()
+ .filter(entry -> entry.getValue().get() > 0)
+ .forEach(entry -> {
+ TenantId tenantId = entry.getKey();
+ DefaultCounter counter = entry.getValue();
+ int rateLimitedRequests = counter.get();
+ counter.clear();
+ if (printTenantNames) {
+ String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
+ try {
+ return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
+ } catch (Exception e) {
+ log.error("[{}] Failed to get tenant name", tenantId, e);
+ return "N/A";
+ }
+ });
+ log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
+ } else {
+ log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
}
});
- log.info("[{}][{}] Rate limited requests: {}", tenantId, name, counter);
- } else {
- log.info("[{}] Rate limited requests: {}", tenantId, counter);
- }
- }));
- rateLimitedTenants.clear();
}
@PreDestroy
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java
index 3554fe7bce..22fda66759 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
+import org.thingsboard.server.common.stats.MessagesStats;
import java.util.ArrayList;
import java.util.List;
@@ -27,7 +28,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -35,15 +35,14 @@ import java.util.stream.Collectors;
public class TbSqlBlockingQueue implements TbSqlQueue {
private final BlockingQueue> queue = new LinkedBlockingQueue<>();
- private final AtomicInteger addedCount = new AtomicInteger();
- private final AtomicInteger savedCount = new AtomicInteger();
- private final AtomicInteger failedCount = new AtomicInteger();
private final TbSqlBlockingQueueParams params;
private ExecutorService executor;
+ private final MessagesStats stats;
- public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) {
+ public TbSqlBlockingQueue(TbSqlBlockingQueueParams params, MessagesStats stats) {
this.params = params;
+ this.stats = stats;
}
@Override
@@ -68,7 +67,7 @@ public class TbSqlBlockingQueue implements TbSqlQueue {
log.debug("[{}] Going to save {} entities", logName, entities.size());
saveFunction.accept(entities.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList()));
entities.forEach(v -> v.getFuture().set(null));
- savedCount.addAndGet(entities.size());
+ stats.incrementSuccessful(entities.size());
if (!fullPack) {
long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
if (remainingDelay > 0) {
@@ -76,7 +75,7 @@ public class TbSqlBlockingQueue implements TbSqlQueue {
}
}
} catch (Exception e) {
- failedCount.addAndGet(entities.size());
+ stats.incrementFailed(entities.size());
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
if (e instanceof InterruptedException) {
log.info("[{}] Queue polling was interrupted", logName);
@@ -91,9 +90,10 @@ public class TbSqlBlockingQueue implements TbSqlQueue {
});
logExecutor.scheduleAtFixedRate(() -> {
- if (queue.size() > 0 || addedCount.get() > 0 || savedCount.get() > 0 || failedCount.get() > 0) {
+ if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
- params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
+ params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed());
+ stats.reset();
}
}, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
}
@@ -109,7 +109,7 @@ public class TbSqlBlockingQueue implements TbSqlQueue {
public ListenableFuture add(E element) {
SettableFuture future = SettableFuture.create();
queue.add(new TbSqlQueueElement<>(future, element));
- addedCount.incrementAndGet();
+ stats.incrementTotal();
return future;
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java
index 9e7c76ace5..a63461787e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java
@@ -18,6 +18,8 @@ package org.thingsboard.server.dao.sql;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.common.stats.MessagesStats;
+import org.thingsboard.server.common.stats.StatsFactory;
@Slf4j
@Data
@@ -28,4 +30,5 @@ public class TbSqlBlockingQueueParams {
private final int batchSize;
private final long maxDelay;
private final long statsPrintIntervalMs;
+ private final String statsNamePrefix;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java
index 2c53cc4ad9..d9596a6efd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueWrapper.java
@@ -18,6 +18,8 @@ package org.thingsboard.server.dao.sql;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.common.stats.MessagesStats;
+import org.thingsboard.server.common.stats.StatsFactory;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -32,10 +34,12 @@ public class TbSqlBlockingQueueWrapper {
private ScheduledLogExecutorComponent logExecutor;
private final Function hashCodeFunction;
private final int maxThreads;
+ private final StatsFactory statsFactory;
public void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction) {
for (int i = 0; i < maxThreads; i++) {
- TbSqlBlockingQueue queue = new TbSqlBlockingQueue<>(params);
+ MessagesStats stats = statsFactory.createMessagesStats(params.getStatsNamePrefix() + ".queue." + i);
+ TbSqlBlockingQueue queue = new TbSqlBlockingQueue<>(params, stats);
queues.add(queue);
queue.init(logExecutor, saveFunction, i);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
index c8d7e3cb21..dedec0bc07 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
@@ -25,6 +25,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.attributes.AttributesDao;
import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
@@ -57,6 +58,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
@Autowired
private AttributeKvInsertRepository attributeKvInsertRepository;
+ @Autowired
+ private StatsFactory statsFactory;
+
@Value("${sql.attributes.batch_size:1000}")
private int batchSize;
@@ -78,10 +82,11 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
.batchSize(batchSize)
.maxDelay(maxDelay)
.statsPrintIntervalMs(statsPrintIntervalMs)
+ .statsNamePrefix("attributes")
.build();
Function hashcodeFunction = entity -> entity.getId().getEntityId().hashCode();
- queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads);
+ queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory);
queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v));
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
index fcc73ae5d0..1d8194d805 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java
@@ -30,8 +30,10 @@ import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
+import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
@@ -57,6 +59,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
protected InsertTsRepository insertRepository;
protected TbSqlBlockingQueueWrapper tsQueue;
+ @Autowired
+ private StatsFactory statsFactory;
@PostConstruct
protected void init() {
@@ -66,10 +70,11 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
.batchSize(tsBatchSize)
.maxDelay(tsMaxDelay)
.statsPrintIntervalMs(tsStatsPrintIntervalMs)
+ .statsNamePrefix("ts")
.build();
Function hashcodeFunction = entity -> entity.getEntityId().hashCode();
- tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads);
+ tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads, statsFactory);
tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java
index 75b3d6951b..9710edad4a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java
@@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
@@ -102,6 +103,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
@Autowired
protected ScheduledLogExecutorComponent logExecutor;
+ @Autowired
+ private StatsFactory statsFactory;
+
@Value("${sql.ts.batch_size:1000}")
protected int tsBatchSize;
@@ -124,10 +128,11 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
.batchSize(tsLatestBatchSize)
.maxDelay(tsLatestMaxDelay)
.statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
+ .statsNamePrefix("ts.latest")
.build();
java.util.function.Function hashcodeFunction = entity -> entity.getEntityId().hashCode();
- tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads);
+ tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads, statsFactory);
tsLatestQueue.init(logExecutor, v -> {
Map trueLatest = new HashMap<>();
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
index 0a9c5de07d..738c98d3b7 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java
@@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
@@ -61,6 +62,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
@Autowired
private AggregationRepository aggregationRepository;
+ @Autowired
+ private StatsFactory statsFactory;
+
@Autowired
protected InsertTsRepository insertRepository;
@@ -74,10 +78,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
.batchSize(tsBatchSize)
.maxDelay(tsMaxDelay)
.statsPrintIntervalMs(tsStatsPrintIntervalMs)
+ .statsNamePrefix("ts.timescale")
.build();
Function hashcodeFunction = entity -> entity.getEntityId().hashCode();
- tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads);
+ tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads, statsFactory);
tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
index b840586a67..d3a8ebbac6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
@@ -30,6 +30,8 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.stats.StatsFactory;
+import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
@@ -53,6 +55,8 @@ import java.util.regex.Matcher;
@Slf4j
public abstract class AbstractBufferedRateExecutor, V> implements BufferedRateExecutor {
+ public static final String CONCURRENCY_LEVEL = "currBuffer";
+
private final long maxWaitTime;
private final long pollMs;
private final BlockingQueue> queue;
@@ -64,20 +68,14 @@ public abstract class AbstractBufferedRateExecutor perTenantLimits = new ConcurrentHashMap<>();
- protected final ConcurrentMap rateLimitedTenants = new ConcurrentHashMap<>();
- protected final AtomicInteger concurrencyLevel = new AtomicInteger();
- protected final AtomicInteger totalAdded = new AtomicInteger();
- protected final AtomicInteger totalLaunched = new AtomicInteger();
- protected final AtomicInteger totalReleased = new AtomicInteger();
- protected final AtomicInteger totalFailed = new AtomicInteger();
- protected final AtomicInteger totalExpired = new AtomicInteger();
- protected final AtomicInteger totalRejected = new AtomicInteger();
- protected final AtomicInteger totalRateLimited = new AtomicInteger();
- protected final AtomicInteger printQueriesIdx = new AtomicInteger();
+ private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
+
+ protected final AtomicInteger concurrencyLevel;
+ protected final BufferedRateExecutorStats stats;
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs,
- boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq) {
+ boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory) {
this.maxWaitTime = maxWaitTime;
this.pollMs = pollMs;
this.concurrencyLimit = concurrencyLimit;
@@ -88,6 +86,10 @@ public abstract class AbstractBufferedRateExecutor new TbRateLimits(perTenantLimitsConfiguration));
if (!rateLimits.tryConsume()) {
- rateLimitedTenants.computeIfAbsent(task.getTenantId(), tId -> new AtomicInteger(0)).incrementAndGet();
- totalRateLimited.incrementAndGet();
+ stats.incrementRateLimitedTenant(task.getTenantId());
+ stats.getTotalRateLimited().increment();
settableFuture.setException(new TenantRateLimitException());
perTenantLimitReached = true;
}
@@ -113,10 +115,10 @@ public abstract class AbstractBufferedRateExecutor(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis()));
} catch (IllegalStateException e) {
- totalRejected.incrementAndGet();
+ stats.getTotalRejected().increment();
settableFuture.setException(e);
}
}
@@ -161,14 +163,14 @@ public abstract class AbstractBufferedRateExecutor 0) {
- totalLaunched.incrementAndGet();
+ stats.getTotalLaunched().increment();
ListenableFuture result = execute(finalTaskCtx);
result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor);
Futures.addCallback(result, new FutureCallback() {
@Override
public void onSuccess(@Nullable V result) {
logTask("Releasing", finalTaskCtx);
- totalReleased.incrementAndGet();
+ stats.getTotalReleased().increment();
concurrencyLevel.decrementAndGet();
finalTaskCtx.getFuture().set(result);
}
@@ -180,7 +182,7 @@ public abstract class AbstractBufferedRateExecutor rateLimitedTenants = new ConcurrentHashMap<>();
+
+ private final List statsCounters = new ArrayList<>();
+
+ private final StatsCounter totalAdded;
+ private final StatsCounter totalLaunched;
+ private final StatsCounter totalReleased;
+ private final StatsCounter totalFailed;
+ private final StatsCounter totalExpired;
+ private final StatsCounter totalRejected;
+ private final StatsCounter totalRateLimited;
+
+ public BufferedRateExecutorStats(StatsFactory statsFactory) {
+ this.statsFactory = statsFactory;
+
+ String key = StatsType.RATE_EXECUTOR.getName();
+
+ this.totalAdded = statsFactory.createStatsCounter(key, TOTAL_ADDED);
+ this.totalLaunched = statsFactory.createStatsCounter(key, TOTAL_LAUNCHED);
+ this.totalReleased = statsFactory.createStatsCounter(key, TOTAL_RELEASED);
+ this.totalFailed = statsFactory.createStatsCounter(key, TOTAL_FAILED);
+ this.totalExpired = statsFactory.createStatsCounter(key, TOTAL_EXPIRED);
+ this.totalRejected = statsFactory.createStatsCounter(key, TOTAL_REJECTED);
+ this.totalRateLimited = statsFactory.createStatsCounter(key, TOTAL_RATE_LIMITED);
+
+ this.statsCounters.add(totalAdded);
+ this.statsCounters.add(totalLaunched);
+ this.statsCounters.add(totalReleased);
+ this.statsCounters.add(totalFailed);
+ this.statsCounters.add(totalExpired);
+ this.statsCounters.add(totalRejected);
+ this.statsCounters.add(totalRateLimited);
+ }
+
+ public void incrementRateLimitedTenant(TenantId tenantId){
+ rateLimitedTenants.computeIfAbsent(tenantId,
+ tId -> {
+ String key = StatsType.RATE_EXECUTOR.getName() + ".tenant";
+ return statsFactory.createDefaultCounter(key, TENANT_ID_TAG, tId.toString());
+ }
+ )
+ .increment();
+ }
+}
diff --git a/pom.xml b/pom.xml
index b233230325..29ec17bb0b 100755
--- a/pom.xml
+++ b/pom.xml
@@ -842,6 +842,11 @@
queue
${project.version}
+
+ org.thingsboard.common
+ stats
+ ${project.version}
+
org.thingsboard
tools
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
index 7e27ec496e..1f82a0dc81 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
@@ -16,8 +16,6 @@
package org.thingsboard.rule.engine.mqtt;
import io.netty.buffer.Unpooled;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
@@ -36,7 +34,6 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.net.ssl.SSLException;
import java.nio.charset.Charset;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,14 +54,14 @@ public class TbMqttNode implements TbNode {
private static final String ERROR = "error";
- private TbMqttNodeConfiguration config;
+ protected TbMqttNodeConfiguration mqttNodeConfiguration;
- private MqttClient mqttClient;
+ protected MqttClient mqttClient;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
try {
- this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
+ this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
this.mqttClient = initClient(ctx);
} catch (Exception e) {
throw new TbNodeException(e);
@@ -73,7 +70,7 @@ public class TbMqttNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
- String topic = TbNodeUtils.processPattern(this.config.getTopicPattern(), msg.getMetaData());
+ String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData());
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
.addListener(future -> {
if (future.isSuccess()) {
@@ -99,38 +96,38 @@ public class TbMqttNode implements TbNode {
}
}
- private MqttClient initClient(TbContext ctx) throws Exception {
+ protected MqttClient initClient(TbContext ctx) throws Exception {
Optional sslContextOpt = initSslContext();
MqttClientConfig config = sslContextOpt.isPresent() ? new MqttClientConfig(sslContextOpt.get()) : new MqttClientConfig();
- if (!StringUtils.isEmpty(this.config.getClientId())) {
- config.setClientId(this.config.getClientId());
+ if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
+ config.setClientId(this.mqttNodeConfiguration.getClientId());
}
- config.setCleanSession(this.config.isCleanSession());
- this.config.getCredentials().configure(config);
+ config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
+ this.mqttNodeConfiguration.getCredentials().configure(config);
MqttClient client = MqttClient.create(config, null);
client.setEventLoop(ctx.getSharedEventLoop());
- Future connectFuture = client.connect(this.config.getHost(), this.config.getPort());
+ Future connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
MqttConnectResult result;
try {
- result = connectFuture.get(this.config.getConnectTimeoutSec(), TimeUnit.SECONDS);
+ result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
} catch (TimeoutException ex) {
connectFuture.cancel(true);
client.disconnect();
- String hostPort = this.config.getHost() + ":" + this.config.getPort();
+ String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort));
}
if (!result.isSuccess()) {
connectFuture.cancel(true);
client.disconnect();
- String hostPort = this.config.getHost() + ":" + this.config.getPort();
+ String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode()));
}
return client;
}
private Optional initSslContext() throws SSLException {
- Optional result = this.config.getCredentials().initSslContext();
- if (this.config.isSsl() && !result.isPresent()) {
+ Optional result = this.mqttNodeConfiguration.getCredentials().initSslContext();
+ if (this.mqttNodeConfiguration.isSsl() && !result.isPresent()) {
result = Optional.of(SslContextBuilder.forClient().build());
}
return result;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java
new file mode 100644
index 0000000000..f04e0fec96
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/AzureIotHubSasCredentials.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.mqtt.azure;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.thingsboard.common.util.AzureIotHubUtil;
+import org.thingsboard.mqtt.MqttClientConfig;
+import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
+
+import javax.net.ssl.TrustManagerFactory;
+import java.io.ByteArrayInputStream;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Optional;
+
+@Data
+@Slf4j
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AzureIotHubSasCredentials implements MqttClientCredentials {
+ private String sasKey;
+ private String caCert;
+
+ @Override
+ public Optional initSslContext() {
+ try {
+ Security.addProvider(new BouncyCastleProvider());
+ if (caCert == null || caCert.isEmpty()) {
+ caCert = AzureIotHubUtil.getDefaultCaCert();
+ }
+ return Optional.of(SslContextBuilder.forClient()
+ .trustManager(createAndInitTrustManagerFactory())
+ .clientAuth(ClientAuth.REQUIRE)
+ .build());
+ } catch (Exception e) {
+ log.error("[{}] Creating TLS factory failed!", caCert, e);
+ throw new RuntimeException("Creating TLS factory failed!", e);
+ }
+ }
+
+ @Override
+ public void configure(MqttClientConfig config) {
+ }
+
+ private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
+ X509Certificate caCertHolder;
+ caCertHolder = readCertFile(caCert);
+
+ KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ caKeyStore.load(null, null);
+ caKeyStore.setCertificateEntry("caCert-cert", caCertHolder);
+
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(caKeyStore);
+ return trustManagerFactory;
+ }
+
+ private X509Certificate readCertFile(String fileContent) throws Exception {
+ X509Certificate certificate = null;
+ if (fileContent != null && !fileContent.trim().isEmpty()) {
+ fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "")
+ .replace("-----END CERTIFICATE-----", "")
+ .replaceAll("\\s", "");
+ byte[] decoded = Base64.decodeBase64(fileContent);
+ CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
+ certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded));
+ }
+ return certificate;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java
new file mode 100644
index 0000000000..7701f45a3d
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.mqtt.azure;
+
+import io.netty.handler.codec.mqtt.MqttVersion;
+import io.netty.handler.ssl.SslContext;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.common.util.AzureIotHubUtil;
+import org.thingsboard.mqtt.MqttClientConfig;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.rule.engine.mqtt.TbMqttNode;
+import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
+import org.thingsboard.rule.engine.mqtt.credentials.CertPemClientCredentials;
+import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+
+import java.util.Optional;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.EXTERNAL,
+ name = "azure iot hub",
+ configClazz = TbAzureIotHubNodeConfiguration.class,
+ nodeDescription = "Publish messages to the Azure IoT Hub",
+ nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS AT_LEAST_ONCE.",
+ uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
+ configDirective = "tbActionNodeAzureIotHubConfig"
+)
+public class TbAzureIotHubNode extends TbMqttNode {
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ try {
+ this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
+ mqttNodeConfiguration.setPort(8883);
+ mqttNodeConfiguration.setCleanSession(true);
+ MqttClientCredentials credentials = mqttNodeConfiguration.getCredentials();
+ mqttNodeConfiguration.setCredentials(new MqttClientCredentials() {
+ @Override
+ public Optional initSslContext() {
+ if (credentials instanceof AzureIotHubSasCredentials) {
+ AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
+ if (sasCredentials.getCaCert() == null || sasCredentials.getCaCert().isEmpty()) {
+ sasCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
+ }
+ } else if (credentials instanceof CertPemClientCredentials) {
+ CertPemClientCredentials pemCredentials = (CertPemClientCredentials) credentials;
+ if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) {
+ pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
+ }
+ }
+ return credentials.initSslContext();
+ }
+
+ @Override
+ public void configure(MqttClientConfig config) {
+ config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
+ config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
+ if (credentials instanceof AzureIotHubSasCredentials) {
+ AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
+ config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), sasCredentials.getSasKey()));
+ }
+ }
+ });
+
+ this.mqttClient = initClient(ctx);
+ } catch (Exception e) {
+ throw new TbNodeException(e);
+ } }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java
new file mode 100644
index 0000000000..6c932d9b73
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNodeConfiguration.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2020 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.mqtt.azure;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
+import org.thingsboard.rule.engine.mqtt.credentials.AnonymousCredentials;
+import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
+
+@Data
+public class TbAzureIotHubNodeConfiguration extends TbMqttNodeConfiguration {
+
+ @Override
+ public TbAzureIotHubNodeConfiguration defaultConfiguration() {
+ TbAzureIotHubNodeConfiguration configuration = new TbAzureIotHubNodeConfiguration();
+ configuration.setTopicPattern("devices//messages/events/");
+ configuration.setHost(".azure-devices.net");
+ configuration.setPort(8883);
+ configuration.setConnectTimeoutSec(10);
+ configuration.setCleanSession(true);
+ configuration.setSsl(true);
+ configuration.setCredentials(new AzureIotHubSasCredentials());
+ return configuration;
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java
index da964d3443..3dcfc2dff8 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java
@@ -166,7 +166,7 @@ public class CertPemClientCredentials implements MqttClientCredentials {
private KeySpec getKeySpec(byte[] encodedKey) throws Exception {
KeySpec keySpec;
- if (password == null) {
+ if (password == null || password.isEmpty()) {
keySpec = new PKCS8EncodedKeySpec(encodedKey);
} else {
PBEKeySpec pbeKeySpec = new PBEKeySpec(password.toCharArray());
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java
index a2137e863c..1c397614ea 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java
@@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.netty.handler.ssl.SslContext;
import org.thingsboard.mqtt.MqttClientConfig;
+import org.thingsboard.rule.engine.mqtt.azure.AzureIotHubSasCredentials;
import java.util.Optional;
@@ -29,6 +30,7 @@ import java.util.Optional;
@JsonSubTypes({
@JsonSubTypes.Type(value = AnonymousCredentials.class, name = "anonymous"),
@JsonSubTypes.Type(value = BasicCredentials.class, name = "basic"),
+ @JsonSubTypes.Type(value = AzureIotHubSasCredentials.class, name = "sas"),
@JsonSubTypes.Type(value = CertPemClientCredentials.class, name = "cert.PEM")})
public interface MqttClientCredentials {
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index 621896cc06..c34dba7c90 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -14,8 +14,16 @@
# limitations under the License.
#
-spring.main.web-environment: false
-spring.main.web-application-type: none
+# If you enabled process metrics you should also enable 'web-environment'.
+spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}"
+# If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
+spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}"
+
+server:
+ # Server bind address (has no effect if web-environment is disabled).
+ address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
+ # Server bind port (has no effect if web-environment is disabled).
+ port: "${HTTP_BIND_PORT:8083}"
# Zookeeper connection parameters. Used for service discovery.
zk:
@@ -205,10 +213,22 @@ queue:
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
- poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
+ poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
service:
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
- tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
+ tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
+
+
+metrics:
+ # Enable/disable actuator metrics.
+ enabled: "${METRICS_ENABLED:false}"
+
+management:
+ endpoints:
+ web:
+ exposure:
+ # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
+ include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index 15fe5c61b5..377c66e711 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -206,10 +206,22 @@ queue:
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
- poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
+ poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
service:
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
- tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
+ tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
+
+
+metrics:
+ # Enable/disable actuator metrics.
+ enabled: "${METRICS_ENABLED:false}"
+
+management:
+ endpoints:
+ web:
+ exposure:
+ # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
+ include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index 579a73e508..bcc27d0755 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -14,8 +14,16 @@
# limitations under the License.
#
-spring.main.web-environment: false
-spring.main.web-application-type: none
+# If you enabled process metrics you should also enable 'web-environment'.
+spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}"
+# If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
+spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}"
+
+server:
+ # Server bind address (has no effect if web-environment is disabled).
+ address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
+ # Server bind port (has no effect if web-environment is disabled).
+ port: "${HTTP_BIND_PORT:8083}"
# Zookeeper connection parameters. Used for service discovery.
zk:
@@ -226,10 +234,21 @@ queue:
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
- poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
+ poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
service:
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
- tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
+ tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
+
+metrics:
+ # Enable/disable actuator metrics.
+ enabled: "${METRICS_ENABLED:false}"
+
+management:
+ endpoints:
+ web:
+ exposure:
+ # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
+ include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file