diff --git a/application/pom.xml b/application/pom.xml
index 5c61fe0f4a..a20aeaf485 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -298,18 +298,6 @@
com.github.ua-parser
uap-java
-
- org.springframework.boot
- spring-boot-starter-actuator
-
-
- io.micrometer
- micrometer-core
-
-
- io.micrometer
- micrometer-registry-prometheus
-
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
index cc5b100f7b..b7e9e95ea4 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
@@ -31,6 +31,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
+import org.thingsboard.server.common.msg.stats.StatsFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
@@ -38,7 +39,6 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.state.DeviceStateService;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
@@ -78,14 +78,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
- consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory));
+ consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory));
}
submitExecutor = Executors.newSingleThreadExecutor();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
index 728f8b3ea1..cf221510bb 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
@@ -17,12 +17,11 @@ package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.gen.transport.TransportProtos;
-import org.thingsboard.server.service.stats.StatsCounter;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
-import org.thingsboard.server.service.stats.StatsType;
+import org.thingsboard.server.common.msg.stats.StatsCounter;
+import org.thingsboard.server.common.msg.stats.StatsFactory;
+import org.thingsboard.server.common.msg.stats.StatsType;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class TbCoreConsumerStats {
@@ -53,20 +52,20 @@ public class TbCoreConsumerStats {
private final List counters = new ArrayList<>();
- public TbCoreConsumerStats(StatsCounterFactory counterFactory) {
+ public TbCoreConsumerStats(StatsFactory statsFactory) {
String statsKey = StatsType.CORE.getName();
- this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
- this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS);
- this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
- this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
- this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
- this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
- this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
- this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
- this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES);
- this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
- this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
+ this.totalCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
+ this.sessionEventCounter = statsFactory.createStatsCounter(statsKey, SESSION_EVENTS);
+ this.getAttributesCounter = statsFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
+ this.subscribeToAttributesCounter = statsFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
+ this.subscribeToRPCCounter = statsFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
+ this.toDeviceRPCCallResponseCounter = statsFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
+ this.subscriptionInfoCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
+ this.claimDeviceCounter = statsFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
+ this.deviceStateCounter = statsFactory.createStatsCounter(statsKey, DEVICE_STATES);
+ this.subscriptionMsgCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
+ this.toCoreNotificationsCounter = statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
counters.add(totalCounter);
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
index 29e4c8c2a0..ef5c6fc173 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
@@ -15,21 +15,19 @@
*/
package org.thingsboard.server.service.queue;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
+import org.thingsboard.server.common.msg.stats.StatsFactory;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
-import org.thingsboard.server.service.stats.StatsCounter;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
-import org.thingsboard.server.service.stats.StatsType;
+import org.thingsboard.server.common.msg.stats.StatsCounter;
+import org.thingsboard.server.common.msg.stats.StatsType;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class TbRuleEngineConsumerStats {
@@ -60,18 +58,18 @@ public class TbRuleEngineConsumerStats {
private final String queueName;
- public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) {
+ public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) {
this.queueName = queueName;
String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
- this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
- this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
- this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
- this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS);
- this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
- this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED);
- this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
- this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
+ this.totalMsgCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
+ this.successMsgCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
+ this.timeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
+ this.failedMsgCounter = statsFactory.createStatsCounter(statsKey, FAILED_MSGS);
+ this.tmpTimeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
+ this.tmpFailedMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_FAILED);
+ this.successIterationsCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
+ this.failedIterationsCounter = statsFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
counters.add(totalMsgCounter);
counters.add(successMsgCounter);
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java
index 24193d8d09..5b5c440f98 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java
+++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java
@@ -18,6 +18,9 @@ package org.thingsboard.server.service.stats;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.JsInvokeStats;
+import org.thingsboard.server.common.msg.stats.StatsCounter;
+import org.thingsboard.server.common.msg.stats.StatsFactory;
+import org.thingsboard.server.common.msg.stats.StatsType;
import javax.annotation.PostConstruct;
@@ -32,14 +35,14 @@ public class DefaultJsInvokeStats implements JsInvokeStats {
private StatsCounter failuresCounter;
@Autowired
- private StatsCounterFactory counterFactory;
+ private StatsFactory statsFactory;
@PostConstruct
public void init() {
String key = StatsType.JS_INVOKE.getName();
- this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS);
- this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES);
- this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES);
+ this.requestsCounter = statsFactory.createStatsCounter(key, REQUESTS);
+ this.responsesCounter = statsFactory.createStatsCounter(key, RESPONSES);
+ this.failuresCounter = statsFactory.createStatsCounter(key, FAILURES);
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java
index 5b9fc91bdf..37cfb9bbe6 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java
@@ -20,6 +20,9 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.msg.stats.MessagesStats;
+import org.thingsboard.server.common.msg.stats.StatsFactory;
+import org.thingsboard.server.common.msg.stats.StatsType;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
@@ -29,10 +32,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
-import org.thingsboard.server.service.stats.DefaultQueueStats;
-import org.thingsboard.server.service.stats.StatsCounter;
-import org.thingsboard.server.service.stats.StatsCounterFactory;
-import org.thingsboard.server.service.stats.StatsType;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -45,13 +44,9 @@ import java.util.concurrent.*;
@Service
@TbCoreComponent
public class TbCoreTransportApiService {
- private static final String TOTAL_MSGS = "totalMsgs";
- private static final String SUCCESSFUL_MSGS = "successfulMsgs";
- private static final String FAILED_MSGS = "failedMsgs";
-
private final TbCoreQueueFactory tbCoreQueueFactory;
private final TransportApiService transportApiService;
- private final StatsCounterFactory counterFactory;
+ private final StatsFactory statsFactory;
@Value("${queue.transport_api.max_pending_requests:10000}")
private int maxPendingRequests;
@@ -66,10 +61,10 @@ public class TbCoreTransportApiService {
private TbQueueResponseTemplate,
TbProtoQueueMsg> transportApiTemplate;
- public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) {
+ public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsFactory statsFactory) {
this.tbCoreQueueFactory = tbCoreQueueFactory;
this.transportApiService = transportApiService;
- this.counterFactory = counterFactory;
+ this.statsFactory = statsFactory;
}
@PostConstruct
@@ -79,10 +74,7 @@ public class TbCoreTransportApiService {
TbQueueConsumer> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();
String key = StatsType.TRANSPORT.getName();
- StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS);
- StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS);
- StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS);
- DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter);
+ MessagesStats queueStats = statsFactory.createMessagesStats(key);
DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
, TbProtoQueueMsg> builder = DefaultTbQueueResponseTemplate.builder();
diff --git a/common/message/pom.xml b/common/message/pom.xml
index 066beec7a1..413ce524b0 100644
--- a/common/message/pom.xml
+++ b/common/message/pom.xml
@@ -69,6 +69,21 @@
protobuf-java
provided
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ io.micrometer
+ micrometer-core
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+
+
junit
junit
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java
similarity index 81%
rename from application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java
rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java
index d6c42a82b5..29990eaea4 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java
@@ -13,16 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.msg.stats;
-import org.thingsboard.server.queue.stats.QueueStats;
-
-public class DefaultQueueStats implements QueueStats {
+public class DefaultMessagesStats implements MessagesStats {
private final StatsCounter totalCounter;
private final StatsCounter successfulCounter;
private final StatsCounter failedCounter;
- public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
+ public DefaultMessagesStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
this.totalCounter = totalCounter;
this.successfulCounter = successfulCounter;
this.failedCounter = failedCounter;
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java
similarity index 59%
rename from application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java
rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java
index f8276bef33..e34f6e0ff2 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java
@@ -13,19 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.msg.stats;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
-import org.thingsboard.server.service.metrics.StubCounter;
import java.util.concurrent.atomic.AtomicInteger;
@Service
-public class StatsCounterFactory {
+public class DefaultStatsFactory implements StatsFactory {
+ private static final String TOTAL_MSGS = "totalMsgs";
+ private static final String SUCCESSFUL_MSGS = "successfulMsgs";
+ private static final String FAILED_MSGS = "failedMsgs";
+
private static final String STATS_NAME_TAG = "statsName";
private static final Counter STUB_COUNTER = new StubCounter();
@@ -33,9 +36,10 @@ public class StatsCounterFactory {
@Autowired
private MeterRegistry meterRegistry;
- @Value("${metrics.enabled}")
+ @Value("${metrics.enabled:false}")
private Boolean metricsEnabled;
+ @Override
public StatsCounter createStatsCounter(String key, String statsName) {
return new StatsCounter(
new AtomicInteger(0),
@@ -45,4 +49,27 @@ public class StatsCounterFactory {
statsName
);
}
+
+ @Override
+ public MessagesStats createMessagesStats(String key) {
+ StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS);
+ StatsCounter successfulCounter = createStatsCounter(key, SUCCESSFUL_MSGS);
+ StatsCounter failedCounter = createStatsCounter(key, FAILED_MSGS);
+ return new DefaultMessagesStats(totalCounter, successfulCounter, failedCounter);
+ }
+
+ private static class StubCounter implements Counter {
+ @Override
+ public void increment(double amount) {}
+
+ @Override
+ public double count() {
+ return 0;
+ }
+
+ @Override
+ public Id getId() {
+ return null;
+ }
+ }
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java
similarity index 92%
rename from common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java
rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java
index d5ea3acdf1..077d2ce01a 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java
@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.queue.stats;
+package org.thingsboard.server.common.msg.stats;
-public interface QueueStats {
+public interface MessagesStats {
default void incrementTotal() {
incrementTotal(1);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsCounter.java
similarity index 96%
rename from application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java
rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsCounter.java
index 17f27e9d46..0097949cb7 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsCounter.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.msg.stats;
import io.micrometer.core.instrument.Counter;
diff --git a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java
similarity index 65%
rename from application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java
rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java
index f7a3117554..243c1e05a0 100644
--- a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java
@@ -13,21 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.metrics;
+package org.thingsboard.server.common.msg.stats;
-import io.micrometer.core.instrument.Counter;
+public interface StatsFactory {
+ StatsCounter createStatsCounter(String key, String statsName);
-public class StubCounter implements Counter {
- @Override
- public void increment(double amount) {}
-
- @Override
- public double count() {
- return 0;
- }
-
- @Override
- public Id getId() {
- return null;
- }
+ MessagesStats createMessagesStats(String key);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsType.java
similarity index 94%
rename from application/src/main/java/org/thingsboard/server/service/stats/StatsType.java
rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsType.java
index a831498993..825f9fcac8 100644
--- a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsType.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.thingsboard.server.service.stats;
+package org.thingsboard.server.common.msg.stats;
public enum StatsType {
RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke");
diff --git a/common/queue/pom.xml b/common/queue/pom.xml
index 2072d34b40..82f62b4d2e 100644
--- a/common/queue/pom.xml
+++ b/common/queue/pom.xml
@@ -112,6 +112,7 @@
org.apache.curator
curator-recipes
+
junit
junit
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java
index 20530360d6..76c243c270 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.queue;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.msg.stats.MessagesStats;
public interface TbQueueRequestTemplate {
@@ -25,4 +26,5 @@ public interface TbQueueRequestTemplate requestTemplate,
@@ -153,6 +156,11 @@ public class DefaultTbQueueRequestTemplate send(Request request) {
if (tickSize > maxPendingRequests) {
@@ -166,14 +174,17 @@ public class DefaultTbQueueRequestTemplate responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
+ if (messagesStats != null) messagesStats.incrementTotal();
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
+ if (messagesStats != null) messagesStats.incrementSuccessful();
log.trace("[{}] Request sent: {}", requestId, metadata);
}
@Override
public void onFailure(Throwable t) {
+ if (messagesStats != null) messagesStats.incrementFailed();
pendingRequests.remove(requestId);
future.setException(t);
}
diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
index 602a180170..6e9870c7fe 100644
--- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
@@ -23,7 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
-import org.thingsboard.server.queue.stats.QueueStats;
+import org.thingsboard.server.common.msg.stats.MessagesStats;
import java.util.List;
import java.util.UUID;
@@ -45,7 +45,7 @@ public class DefaultTbQueueResponseTemplate();
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
index 14f01a6d63..6162337ef6 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
@@ -55,6 +55,9 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
+import org.thingsboard.server.common.msg.stats.MessagesStats;
+import org.thingsboard.server.common.msg.stats.StatsFactory;
+import org.thingsboard.server.common.msg.stats.StatsType;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -101,12 +104,17 @@ public class DefaultTransportService implements TransportService {
private final TbQueueProducerProvider producerProvider;
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
+ private final StatsFactory statsFactory;
protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate;
protected TbQueueProducer> ruleEngineMsgProducer;
protected TbQueueProducer> tbCoreMsgProducer;
protected TbQueueConsumer> transportNotificationsConsumer;
+ protected MessagesStats ruleEngineProducerStats;
+ protected MessagesStats tbCoreProducerStats;
+ protected MessagesStats transportApiStats;
+
protected ScheduledExecutorService schedulerExecutor;
protected ExecutorService transportCallbackExecutor;
@@ -119,11 +127,12 @@ public class DefaultTransportService implements TransportService {
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
private volatile boolean stopped = false;
- public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) {
+ public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) {
this.serviceInfoProvider = serviceInfoProvider;
this.queueProvider = queueProvider;
this.producerProvider = producerProvider;
this.partitionService = partitionService;
+ this.statsFactory = statsFactory;
}
@PostConstruct
@@ -133,10 +142,14 @@ public class DefaultTransportService implements TransportService {
new TbRateLimits(perTenantLimitsConf);
new TbRateLimits(perDevicesLimitsConf);
}
+ this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
+ this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
+ this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
+ transportApiRequestTemplate.setMessagesStats(transportApiStats);
ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
@@ -557,10 +570,14 @@ public class DefaultTransportService implements TransportService {
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);
}
+ TransportTbQueueCallback transportTbQueueCallback = callback != null ?
+ new TransportTbQueueCallback(callback) : null;
+ tbCoreProducerStats.incrementTotal();
+ StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
tbCoreMsgProducer.send(tpi,
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
- ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
- new TransportTbQueueCallback(callback) : null);
+ ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),
+ wrappedCallback);
}
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
@@ -571,7 +588,9 @@ public class DefaultTransportService implements TransportService {
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
- ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
+ ruleEngineProducerStats.incrementTotal();
+ StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
+ ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
}
private class TransportTbQueueCallback implements TbQueueCallback {
@@ -592,6 +611,30 @@ public class DefaultTransportService implements TransportService {
}
}
+ private class StatsCallback implements TbQueueCallback {
+ private final TbQueueCallback callback;
+ private final MessagesStats stats;
+
+ private StatsCallback(TbQueueCallback callback, MessagesStats stats) {
+ this.callback = callback;
+ this.stats = stats;
+ }
+
+ @Override
+ public void onSuccess(TbQueueMsgMetadata metadata) {
+ stats.incrementSuccessful();
+ if (callback != null)
+ callback.onSuccess(metadata);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ stats.incrementFailed();
+ if (callback != null)
+ callback.onFailure(t);
+ }
+ }
+
private class MsgPackCallback implements TbQueueCallback {
private final AtomicInteger msgCount;
private final TransportServiceCallback callback;
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index 4ab8921d4b..c34dba7c90 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -14,8 +14,16 @@
# limitations under the License.
#
-spring.main.web-environment: false
-spring.main.web-application-type: none
+# If you enabled process metrics you should also enable 'web-environment'.
+spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}"
+# If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
+spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}"
+
+server:
+ # Server bind address (has no effect if web-environment is disabled).
+ address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
+ # Server bind port (has no effect if web-environment is disabled).
+ port: "${HTTP_BIND_PORT:8083}"
# Zookeeper connection parameters. Used for service discovery.
zk:
@@ -211,4 +219,16 @@ service:
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
- tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
+ tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
+
+
+metrics:
+ # Enable/disable actuator metrics.
+ enabled: "${METRICS_ENABLED:false}"
+
+management:
+ endpoints:
+ web:
+ exposure:
+ # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
+ include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index 813fd41734..377c66e711 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -212,4 +212,16 @@ service:
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
- tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
+ tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
+
+
+metrics:
+ # Enable/disable actuator metrics.
+ enabled: "${METRICS_ENABLED:false}"
+
+management:
+ endpoints:
+ web:
+ exposure:
+ # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
+ include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index bb3db211ef..bcc27d0755 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -14,8 +14,16 @@
# limitations under the License.
#
-spring.main.web-environment: false
-spring.main.web-application-type: none
+# If you enabled process metrics you should also enable 'web-environment'.
+spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}"
+# If you enabled process metrics you should set 'web-application-type' to 'servlet' value.
+spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}"
+
+server:
+ # Server bind address (has no effect if web-environment is disabled).
+ address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
+ # Server bind port (has no effect if web-environment is disabled).
+ port: "${HTTP_BIND_PORT:8083}"
# Zookeeper connection parameters. Used for service discovery.
zk:
@@ -232,4 +240,15 @@ service:
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
- tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
\ No newline at end of file
+ tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
+
+metrics:
+ # Enable/disable actuator metrics.
+ enabled: "${METRICS_ENABLED:false}"
+
+management:
+ endpoints:
+ web:
+ exposure:
+ # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
+ include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file