From 1700cf77077937d5f66e793ff4ddded5e7fe4629 Mon Sep 17 00:00:00 2001 From: vzikratyi Date: Fri, 10 Jul 2020 13:01:39 +0300 Subject: [PATCH] Added producer stats to 'transport' services --- application/pom.xml | 12 ----- .../queue/DefaultTbCoreConsumerService.java | 6 +-- .../DefaultTbRuleEngineConsumerService.java | 10 ++-- .../service/queue/TbCoreConsumerStats.java | 31 ++++++----- .../queue/TbRuleEngineConsumerStats.java | 26 +++++----- .../service/stats/DefaultJsInvokeStats.java | 11 ++-- .../transport/TbCoreTransportApiService.java | 22 +++----- common/message/pom.xml | 15 ++++++ .../msg/stats/DefaultMessagesStats.java | 8 ++- .../common/msg/stats/DefaultStatsFactory.java | 35 +++++++++++-- .../common/msg/stats/MessagesStats.java} | 4 +- .../common/msg}/stats/StatsCounter.java | 2 +- .../server/common/msg/stats/StatsFactory.java | 19 ++----- .../server/common/msg}/stats/StatsType.java | 2 +- common/queue/pom.xml | 1 + .../server/queue/TbQueueRequestTemplate.java | 2 + .../common/DefaultTbQueueRequestTemplate.java | 11 ++++ .../DefaultTbQueueResponseTemplate.java | 6 +-- .../service/DefaultTransportService.java | 51 +++++++++++++++++-- .../src/main/resources/tb-coap-transport.yml | 26 ++++++++-- .../src/main/resources/tb-http-transport.yml | 14 ++++- .../src/main/resources/tb-mqtt-transport.yml | 25 +++++++-- 22 files changed, 228 insertions(+), 111 deletions(-) rename application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java => common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java (81%) rename application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java => common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java (59%) rename common/{queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java => message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java} (92%) rename {application/src/main/java/org/thingsboard/server/service => common/message/src/main/java/org/thingsboard/server/common/msg}/stats/StatsCounter.java (96%) rename application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java => common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java (65%) rename {application/src/main/java/org/thingsboard/server/service => common/message/src/main/java/org/thingsboard/server/common/msg}/stats/StatsType.java (94%) diff --git a/application/pom.xml b/application/pom.xml index 5c61fe0f4a..a20aeaf485 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -298,18 +298,6 @@ com.github.ua-parser uap-java - - org.springframework.boot - spring-boot-starter-actuator - - - io.micrometer - micrometer-core - - - io.micrometer - micrometer-registry-prometheus - diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index cc5b100f7b..b7e9e95ea4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -31,6 +31,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; +import org.thingsboard.server.common.msg.stats.StatsFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; @@ -38,7 +39,6 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.state.DeviceStateService; -import org.thingsboard.server.service.stats.StatsCounterFactory; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -78,14 +78,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); - consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory)); + consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory)); } submitExecutor = Executors.newSingleThreadExecutor(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 728f8b3ea1..cf221510bb 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -17,12 +17,11 @@ package org.thingsboard.server.service.queue; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.service.stats.StatsCounter; -import org.thingsboard.server.service.stats.StatsCounterFactory; -import org.thingsboard.server.service.stats.StatsType; +import org.thingsboard.server.common.msg.stats.StatsCounter; +import org.thingsboard.server.common.msg.stats.StatsFactory; +import org.thingsboard.server.common.msg.stats.StatsType; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class TbCoreConsumerStats { @@ -53,20 +52,20 @@ public class TbCoreConsumerStats { private final List counters = new ArrayList<>(); - public TbCoreConsumerStats(StatsCounterFactory counterFactory) { + public TbCoreConsumerStats(StatsFactory statsFactory) { String statsKey = StatsType.CORE.getName(); - this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS); - this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS); - this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE); - this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES); - this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES); - this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES); - this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO); - this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS); - this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES); - this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS); - this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS); + this.totalCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS); + this.sessionEventCounter = statsFactory.createStatsCounter(statsKey, SESSION_EVENTS); + this.getAttributesCounter = statsFactory.createStatsCounter(statsKey, GET_ATTRIBUTE); + this.subscribeToAttributesCounter = statsFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES); + this.subscribeToRPCCounter = statsFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES); + this.toDeviceRPCCallResponseCounter = statsFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES); + this.subscriptionInfoCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO); + this.claimDeviceCounter = statsFactory.createStatsCounter(statsKey, DEVICE_CLAIMS); + this.deviceStateCounter = statsFactory.createStatsCounter(statsKey, DEVICE_STATES); + this.subscriptionMsgCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS); + this.toCoreNotificationsCounter = statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS); counters.add(totalCounter); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java index 29e4c8c2a0..ef5c6fc173 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java @@ -15,21 +15,19 @@ */ package org.thingsboard.server.service.queue; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.common.msg.stats.StatsFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; -import org.thingsboard.server.service.stats.StatsCounter; -import org.thingsboard.server.service.stats.StatsCounterFactory; -import org.thingsboard.server.service.stats.StatsType; +import org.thingsboard.server.common.msg.stats.StatsCounter; +import org.thingsboard.server.common.msg.stats.StatsType; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class TbRuleEngineConsumerStats { @@ -60,18 +58,18 @@ public class TbRuleEngineConsumerStats { private final String queueName; - public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) { + public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) { this.queueName = queueName; String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName; - this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS); - this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS); - this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS); - this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS); - this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT); - this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED); - this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS); - this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS); + this.totalMsgCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS); + this.successMsgCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS); + this.timeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TIMEOUT_MSGS); + this.failedMsgCounter = statsFactory.createStatsCounter(statsKey, FAILED_MSGS); + this.tmpTimeoutMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_TIMEOUT); + this.tmpFailedMsgCounter = statsFactory.createStatsCounter(statsKey, TMP_FAILED); + this.successIterationsCounter = statsFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS); + this.failedIterationsCounter = statsFactory.createStatsCounter(statsKey, FAILED_ITERATIONS); counters.add(totalMsgCounter); counters.add(successMsgCounter); diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java index 24193d8d09..5b5c440f98 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultJsInvokeStats.java @@ -18,6 +18,9 @@ package org.thingsboard.server.service.stats; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.actors.JsInvokeStats; +import org.thingsboard.server.common.msg.stats.StatsCounter; +import org.thingsboard.server.common.msg.stats.StatsFactory; +import org.thingsboard.server.common.msg.stats.StatsType; import javax.annotation.PostConstruct; @@ -32,14 +35,14 @@ public class DefaultJsInvokeStats implements JsInvokeStats { private StatsCounter failuresCounter; @Autowired - private StatsCounterFactory counterFactory; + private StatsFactory statsFactory; @PostConstruct public void init() { String key = StatsType.JS_INVOKE.getName(); - this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS); - this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES); - this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES); + this.requestsCounter = statsFactory.createStatsCounter(key, REQUESTS); + this.responsesCounter = statsFactory.createStatsCounter(key, RESPONSES); + this.failuresCounter = statsFactory.createStatsCounter(key, FAILURES); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java index 5b9fc91bdf..37cfb9bbe6 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreTransportApiService.java @@ -20,6 +20,9 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.msg.stats.MessagesStats; +import org.thingsboard.server.common.msg.stats.StatsFactory; +import org.thingsboard.server.common.msg.stats.StatsType; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; @@ -29,10 +32,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.stats.DefaultQueueStats; -import org.thingsboard.server.service.stats.StatsCounter; -import org.thingsboard.server.service.stats.StatsCounterFactory; -import org.thingsboard.server.service.stats.StatsType; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -45,13 +44,9 @@ import java.util.concurrent.*; @Service @TbCoreComponent public class TbCoreTransportApiService { - private static final String TOTAL_MSGS = "totalMsgs"; - private static final String SUCCESSFUL_MSGS = "successfulMsgs"; - private static final String FAILED_MSGS = "failedMsgs"; - private final TbCoreQueueFactory tbCoreQueueFactory; private final TransportApiService transportApiService; - private final StatsCounterFactory counterFactory; + private final StatsFactory statsFactory; @Value("${queue.transport_api.max_pending_requests:10000}") private int maxPendingRequests; @@ -66,10 +61,10 @@ public class TbCoreTransportApiService { private TbQueueResponseTemplate, TbProtoQueueMsg> transportApiTemplate; - public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) { + public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsFactory statsFactory) { this.tbCoreQueueFactory = tbCoreQueueFactory; this.transportApiService = transportApiService; - this.counterFactory = counterFactory; + this.statsFactory = statsFactory; } @PostConstruct @@ -79,10 +74,7 @@ public class TbCoreTransportApiService { TbQueueConsumer> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer(); String key = StatsType.TRANSPORT.getName(); - StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS); - StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS); - StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS); - DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter); + MessagesStats queueStats = statsFactory.createMessagesStats(key); DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder , TbProtoQueueMsg> builder = DefaultTbQueueResponseTemplate.builder(); diff --git a/common/message/pom.xml b/common/message/pom.xml index 066beec7a1..413ce524b0 100644 --- a/common/message/pom.xml +++ b/common/message/pom.xml @@ -69,6 +69,21 @@ protobuf-java provided + + + + org.springframework.boot + spring-boot-starter-actuator + + + io.micrometer + micrometer-core + + + io.micrometer + micrometer-registry-prometheus + + junit junit diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java similarity index 81% rename from application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java index d6c42a82b5..29990eaea4 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultQueueStats.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultMessagesStats.java @@ -13,16 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.stats; +package org.thingsboard.server.common.msg.stats; -import org.thingsboard.server.queue.stats.QueueStats; - -public class DefaultQueueStats implements QueueStats { +public class DefaultMessagesStats implements MessagesStats { private final StatsCounter totalCounter; private final StatsCounter successfulCounter; private final StatsCounter failedCounter; - public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) { + public DefaultMessagesStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) { this.totalCounter = totalCounter; this.successfulCounter = successfulCounter; this.failedCounter = failedCounter; diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java similarity index 59% rename from application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java index f8276bef33..e34f6e0ff2 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounterFactory.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/DefaultStatsFactory.java @@ -13,19 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.stats; +package org.thingsboard.server.common.msg.stats; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.thingsboard.server.service.metrics.StubCounter; import java.util.concurrent.atomic.AtomicInteger; @Service -public class StatsCounterFactory { +public class DefaultStatsFactory implements StatsFactory { + private static final String TOTAL_MSGS = "totalMsgs"; + private static final String SUCCESSFUL_MSGS = "successfulMsgs"; + private static final String FAILED_MSGS = "failedMsgs"; + private static final String STATS_NAME_TAG = "statsName"; private static final Counter STUB_COUNTER = new StubCounter(); @@ -33,9 +36,10 @@ public class StatsCounterFactory { @Autowired private MeterRegistry meterRegistry; - @Value("${metrics.enabled}") + @Value("${metrics.enabled:false}") private Boolean metricsEnabled; + @Override public StatsCounter createStatsCounter(String key, String statsName) { return new StatsCounter( new AtomicInteger(0), @@ -45,4 +49,27 @@ public class StatsCounterFactory { statsName ); } + + @Override + public MessagesStats createMessagesStats(String key) { + StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS); + StatsCounter successfulCounter = createStatsCounter(key, SUCCESSFUL_MSGS); + StatsCounter failedCounter = createStatsCounter(key, FAILED_MSGS); + return new DefaultMessagesStats(totalCounter, successfulCounter, failedCounter); + } + + private static class StubCounter implements Counter { + @Override + public void increment(double amount) {} + + @Override + public double count() { + return 0; + } + + @Override + public Id getId() { + return null; + } + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java similarity index 92% rename from common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java index d5ea3acdf1..077d2ce01a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/stats/QueueStats.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/MessagesStats.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.queue.stats; +package org.thingsboard.server.common.msg.stats; -public interface QueueStats { +public interface MessagesStats { default void incrementTotal() { incrementTotal(1); } diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsCounter.java similarity index 96% rename from application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsCounter.java index 17f27e9d46..0097949cb7 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/StatsCounter.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsCounter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.stats; +package org.thingsboard.server.common.msg.stats; import io.micrometer.core.instrument.Counter; diff --git a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java similarity index 65% rename from application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java index f7a3117554..243c1e05a0 100644 --- a/application/src/main/java/org/thingsboard/server/service/metrics/StubCounter.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsFactory.java @@ -13,21 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.metrics; +package org.thingsboard.server.common.msg.stats; -import io.micrometer.core.instrument.Counter; +public interface StatsFactory { + StatsCounter createStatsCounter(String key, String statsName); -public class StubCounter implements Counter { - @Override - public void increment(double amount) {} - - @Override - public double count() { - return 0; - } - - @Override - public Id getId() { - return null; - } + MessagesStats createMessagesStats(String key); } diff --git a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsType.java similarity index 94% rename from application/src/main/java/org/thingsboard/server/service/stats/StatsType.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsType.java index a831498993..825f9fcac8 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/StatsType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/stats/StatsType.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.stats; +package org.thingsboard.server.common.msg.stats; public enum StatsType { RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke"); diff --git a/common/queue/pom.xml b/common/queue/pom.xml index 2072d34b40..82f62b4d2e 100644 --- a/common/queue/pom.xml +++ b/common/queue/pom.xml @@ -112,6 +112,7 @@ org.apache.curator curator-recipes + junit junit diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java index 20530360d6..76c243c270 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueRequestTemplate.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.msg.stats.MessagesStats; public interface TbQueueRequestTemplate { @@ -25,4 +26,5 @@ public interface TbQueueRequestTemplate requestTemplate, @@ -153,6 +156,11 @@ public class DefaultTbQueueRequestTemplate send(Request request) { if (tickSize > maxPendingRequests) { @@ -166,14 +174,17 @@ public class DefaultTbQueueRequestTemplate responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); pendingRequests.putIfAbsent(requestId, responseMetaData); log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime); + if (messagesStats != null) messagesStats.incrementTotal(); requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { + if (messagesStats != null) messagesStats.incrementSuccessful(); log.trace("[{}] Request sent: {}", requestId, metadata); } @Override public void onFailure(Throwable t) { + if (messagesStats != null) messagesStats.incrementFailed(); pendingRequests.remove(requestId); future.setException(t); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java index 602a180170..6e9870c7fe 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java @@ -23,7 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueResponseTemplate; -import org.thingsboard.server.queue.stats.QueueStats; +import org.thingsboard.server.common.msg.stats.MessagesStats; import java.util.List; import java.util.UUID; @@ -45,7 +45,7 @@ public class DefaultTbQueueResponseTemplate(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 14f01a6d63..6162337ef6 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -55,6 +55,9 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; +import org.thingsboard.server.common.msg.stats.MessagesStats; +import org.thingsboard.server.common.msg.stats.StatsFactory; +import org.thingsboard.server.common.msg.stats.StatsType; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -101,12 +104,17 @@ public class DefaultTransportService implements TransportService { private final TbQueueProducerProvider producerProvider; private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; + private final StatsFactory statsFactory; protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; protected TbQueueProducer> ruleEngineMsgProducer; protected TbQueueProducer> tbCoreMsgProducer; protected TbQueueConsumer> transportNotificationsConsumer; + protected MessagesStats ruleEngineProducerStats; + protected MessagesStats tbCoreProducerStats; + protected MessagesStats transportApiStats; + protected ScheduledExecutorService schedulerExecutor; protected ExecutorService transportCallbackExecutor; @@ -119,11 +127,12 @@ public class DefaultTransportService implements TransportService { private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); private volatile boolean stopped = false; - public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) { + public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) { this.serviceInfoProvider = serviceInfoProvider; this.queueProvider = queueProvider; this.producerProvider = producerProvider; this.partitionService = partitionService; + this.statsFactory = statsFactory; } @PostConstruct @@ -133,10 +142,14 @@ public class DefaultTransportService implements TransportService { new TbRateLimits(perTenantLimitsConf); new TbRateLimits(perDevicesLimitsConf); } + this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer"); + this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer"); + this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer"); this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler")); this.transportCallbackExecutor = Executors.newWorkStealingPool(20); this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate(); + transportApiRequestTemplate.setMessagesStats(transportApiStats); ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer(); tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer(); @@ -557,10 +570,14 @@ public class DefaultTransportService implements TransportService { if (log.isTraceEnabled()) { log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg); } + TransportTbQueueCallback transportTbQueueCallback = callback != null ? + new TransportTbQueueCallback(callback) : null; + tbCoreProducerStats.incrementTotal(); + StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), - ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ? - new TransportTbQueueCallback(callback) : null); + ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), + wrappedCallback); } protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) { @@ -571,7 +588,9 @@ public class DefaultTransportService implements TransportService { ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); - ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback); + ruleEngineProducerStats.incrementTotal(); + StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats); + ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); } private class TransportTbQueueCallback implements TbQueueCallback { @@ -592,6 +611,30 @@ public class DefaultTransportService implements TransportService { } } + private class StatsCallback implements TbQueueCallback { + private final TbQueueCallback callback; + private final MessagesStats stats; + + private StatsCallback(TbQueueCallback callback, MessagesStats stats) { + this.callback = callback; + this.stats = stats; + } + + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + stats.incrementSuccessful(); + if (callback != null) + callback.onSuccess(metadata); + } + + @Override + public void onFailure(Throwable t) { + stats.incrementFailed(); + if (callback != null) + callback.onFailure(t); + } + } + private class MsgPackCallback implements TbQueueCallback { private final AtomicInteger msgCount; private final TransportServiceCallback callback; diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 4ab8921d4b..c34dba7c90 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -14,8 +14,16 @@ # limitations under the License. # -spring.main.web-environment: false -spring.main.web-application-type: none +# If you enabled process metrics you should also enable 'web-environment'. +spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}" +# If you enabled process metrics you should set 'web-application-type' to 'servlet' value. +spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}" + +server: + # Server bind address (has no effect if web-environment is disabled). + address: "${HTTP_BIND_ADDRESS:0.0.0.0}" + # Server bind port (has no effect if web-environment is disabled). + port: "${HTTP_BIND_PORT:8083}" # Zookeeper connection parameters. Used for service discovery. zk: @@ -211,4 +219,16 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. \ No newline at end of file + tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. + + +metrics: + # Enable/disable actuator metrics. + enabled: "${METRICS_ENABLED:false}" + +management: + endpoints: + web: + exposure: + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). + include: '${METRICS_ENDPOINTS_EXPOSE:info}' \ No newline at end of file diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 813fd41734..377c66e711 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -212,4 +212,16 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. \ No newline at end of file + tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. + + +metrics: + # Enable/disable actuator metrics. + enabled: "${METRICS_ENABLED:false}" + +management: + endpoints: + web: + exposure: + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). + include: '${METRICS_ENDPOINTS_EXPOSE:info}' \ No newline at end of file diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index bb3db211ef..bcc27d0755 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -14,8 +14,16 @@ # limitations under the License. # -spring.main.web-environment: false -spring.main.web-application-type: none +# If you enabled process metrics you should also enable 'web-environment'. +spring.main.web-environment: "${WEB_APPLICATION_ENABLE:false}" +# If you enabled process metrics you should set 'web-application-type' to 'servlet' value. +spring.main.web-application-type: "${WEB_APPLICATION_TYPE:none}" + +server: + # Server bind address (has no effect if web-environment is disabled). + address: "${HTTP_BIND_ADDRESS:0.0.0.0}" + # Server bind port (has no effect if web-environment is disabled). + port: "${HTTP_BIND_PORT:8083}" # Zookeeper connection parameters. Used for service discovery. zk: @@ -232,4 +240,15 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. \ No newline at end of file + tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. + +metrics: + # Enable/disable actuator metrics. + enabled: "${METRICS_ENABLED:false}" + +management: + endpoints: + web: + exposure: + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). + include: '${METRICS_ENDPOINTS_EXPOSE:info}' \ No newline at end of file