diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java b/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java index 7fd0b12077..3a93f318d1 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java @@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.server.common.data.edqs.EdqsState; import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.TenantId; @@ -149,9 +150,9 @@ public class EntityQueryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN')") - @GetMapping("/edqs/enabled") - public boolean isEdqsApiEnabled() { - return edqsApiService.isEnabled(); + @GetMapping("/edqs/state") + public EdqsState getEdqsState() { + return edqsService.getState(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index 91c08ab6e0..f1cb25c6fa 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -72,7 +72,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF @Override public void restore(QueueKey queueKey, Set partitions) { - stateService.update(queueKey, partitions); + stateService.update(queueKey, partitions, null); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java index c7e17b62ae..e0f0db82cc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java @@ -22,7 +22,6 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; @@ -51,11 +50,6 @@ public class DefaultEdqsApiService implements EdqsApiService { private final EdqsClientQueueFactory queueFactory; private TbQueueRequestTemplate, TbProtoQueueMsg> requestTemplate; - @Value("${queue.edqs.api.auto_enable:true}") - private boolean autoEnable; - - private Boolean apiEnabled = null; - @PostConstruct private void init() { requestTemplate = queueFactory.createEdqsRequestTemplate(); @@ -85,31 +79,11 @@ public class DefaultEdqsApiService implements EdqsApiService { }, MoreExecutors.directExecutor()); } - @Override - public boolean isEnabled() { - return Boolean.TRUE.equals(apiEnabled); - } - - @Override - public void setEnabled(boolean enabled) { - if (enabled) { - log.info("Enabling EDQS API"); - } else { - log.info("Disabling EDQS API"); - } - apiEnabled = enabled; - } - @Override public boolean isSupported() { return true; } - @Override - public boolean isAutoEnable() { - return autoEnable; - } - @PreDestroy private void stop() { requestTemplate.stop(); diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index cd46c4ba96..92ae38f7e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -20,11 +20,13 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -36,6 +38,9 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsState; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus; import org.thingsboard.server.common.data.edqs.EdqsSyncRequest; import org.thingsboard.server.common.data.edqs.Entity; import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg; @@ -51,20 +56,25 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.edqs.processor.EdqsProducer; import org.thingsboard.server.edqs.state.EdqsPartitionService; -import org.thingsboard.server.edqs.util.EdqsConverter; +import org.thingsboard.server.edqs.util.DefaultEdqsMapper; +import org.thingsboard.server.edqs.util.EdqsMapper; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsCoreServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.HashPartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.environment.DistributedLock; import org.thingsboard.server.queue.environment.DistributedLockService; import org.thingsboard.server.queue.provider.EdqsClientQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Service @@ -74,31 +84,42 @@ import java.util.concurrent.TimeUnit; public class DefaultEdqsService implements EdqsService { private final EdqsClientQueueFactory queueFactory; - private final EdqsConverter edqsConverter; + private final EdqsMapper edqsMapper; private final EdqsSyncService edqsSyncService; private final EdqsApiService edqsApiService; private final DistributedLockService distributedLockService; private final AttributesService attributesService; private final EdqsPartitionService edqsPartitionService; - private final TopicService topicService; private final TbServiceInfoProvider serviceInfoProvider; + private final DiscoveryService discoveryService; @Autowired @Lazy private TbClusterService clusterService; @Autowired @Lazy private HashPartitionService hashPartitionService; + @Value("${queue.edqs.api.auto_enable:true}") + private boolean autoEnableApi; + @Value("${queue.edqs.readiness_check_interval:60000}") + private int edqsReadinessCheckInterval; + private EdqsProducer eventsProducer; private ExecutorService executor; + private ScheduledExecutorService scheduler; private DistributedLock syncLock; + @Getter + private EdqsState state; + @PostConstruct private void init() { executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass()); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-check"); eventsProducer = EdqsProducer.builder() .producer(queueFactory.createEdqsEventsProducer()) .partitionService(edqsPartitionService) .build(); syncLock = distributedLockService.getLock("edqs_sync"); + state = new EdqsState(); } @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @@ -106,6 +127,26 @@ public class DefaultEdqsService implements EdqsService { if (!serviceInfoProvider.isService(ServiceType.TB_CORE)) { return; } + if (edqsApiService.isSupported()) { + scheduler.scheduleWithFixedDelay(() -> { + if (!hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) { + return; + } + + List servers = new ArrayList<>(discoveryService.getOtherServers()); + servers.add(serviceInfoProvider.getServiceInfo()); + + List readyEdqsServers = servers.stream() + .filter(serviceInfo -> serviceInfo.getServiceTypesList().contains(ServiceType.EDQS.name())) + .filter(ServiceInfo::getReady) + .toList(); + boolean changed = state.setEdqsReady(!readyEdqsServers.isEmpty()); + if (changed) { + broadcastEdqsReady(state.getEdqsReady()); + } + }, 0, edqsReadinessCheckInterval, TimeUnit.MILLISECONDS); + } + executor.submit(() -> { try { EdqsSyncState syncState = getSyncState(); @@ -115,9 +156,9 @@ public class DefaultEdqsService implements EdqsService { .syncRequest(new EdqsSyncRequest()) .build()); } - } else if (edqsApiService.isSupported() && edqsApiService.isAutoEnable()) { + } else { // only if topic/RocksDB is not empty and sync is finished - edqsApiService.setEnabled(true); + onSyncStatusUpdate(EdqsSyncStatus.FINISHED); } } catch (Throwable e) { log.error("Failed to start EDQS service", e); @@ -131,7 +172,10 @@ public class DefaultEdqsService implements EdqsService { if (request.getSyncRequest() != null) { saveSyncState(EdqsSyncStatus.REQUESTED); } - broadcast(request.toInternalMsg()); + broadcast(ToCoreEdqsMsg.builder() + .syncRequest(request.getSyncRequest()) + .apiEnabled(request.getApiEnabled()) + .build()); } @Override @@ -140,7 +184,18 @@ public class DefaultEdqsService implements EdqsService { log.info("Processing system msg {}", msg); try { if (msg.getApiEnabled() != null) { - edqsApiService.setEnabled(msg.getApiEnabled()); + if (msg.getApiEnabled()) { + state.setApiMode(EdqsApiMode.ENABLED); + } else { + state.setApiMode(EdqsApiMode.DISABLED); + } + log.info("New state: {}", state); + } + if (msg.getEdqsReady() != null) { + onEdqsReady(msg.getEdqsReady()); + } + if (msg.getSyncStatus() != null) { + onSyncStatusUpdate(msg.getSyncStatus()); } if (msg.getSyncRequest() != null) { @@ -154,23 +209,16 @@ public class DefaultEdqsService implements EdqsService { return; } } - saveSyncState(EdqsSyncStatus.STARTED); + edqsSyncService.sync(); - saveSyncState(EdqsSyncStatus.FINISHED); - if (edqsApiService.isSupported()) - if (edqsApiService.isAutoEnable()) { - log.info("EDQS sync is finished, auto-enabling API"); - broadcast(ToCoreEdqsMsg.builder() - .apiEnabled(Boolean.TRUE) - .build()); - } else { - log.info("EDQS sync is finished, but leaving API disabled"); - } + saveSyncState(EdqsSyncStatus.FINISHED); + broadcastSyncStatusUpdate(EdqsSyncStatus.FINISHED); } catch (Exception e) { log.error("Failed to complete sync", e); saveSyncState(EdqsSyncStatus.FAILED); + broadcastSyncStatusUpdate(EdqsSyncStatus.FAILED); } finally { syncLock.unlock(); } @@ -181,6 +229,60 @@ public class DefaultEdqsService implements EdqsService { }); } + private void broadcastEdqsReady(boolean ready) { + broadcast(ToCoreEdqsMsg.builder() + .edqsReady(ready) + .build()); + } + + private void onEdqsReady(boolean ready) { + state.setEdqsReady(ready); + checkState(); + } + + private void broadcastSyncStatusUpdate(EdqsSyncStatus status) { + broadcast(ToCoreEdqsMsg.builder() + .syncStatus(status) + .build()); + } + + private void onSyncStatusUpdate(EdqsSyncStatus status) { + state.setSyncStatus(status); + checkState(); + } + + private void checkState() { + if (!edqsApiService.isSupported()) { + log.info("New state: {}. EDQS API not supported", state); + return; + } + + if (state.isApiReady()) { + if (autoEnableApi) { + if (state.getApiMode() == null || state.getApiMode() == EdqsApiMode.AUTO_DISABLED) { + state.setApiMode(EdqsApiMode.AUTO_ENABLED); + log.info("New state: {}. Auto-enabled EDQS API", state); + } else { + log.info("New state: {}. API mode left as is", state); + } + } else { + log.info("New state: {}. API auto-enabling is disabled", state); + } + } else { + if (state.isApiEnabled()) { + state.setApiMode(EdqsApiMode.AUTO_DISABLED); + log.info("New state: {}. Disabled EDQS API", state); + } else { + log.info("New state: {}. API left disabled", state); + } + } + } + + @Override + public boolean isApiEnabled() { + return state.isApiEnabled(); + } + @Override public void onUpdate(TenantId tenantId, EntityId entityId, Object entity) { EntityType entityType = entityId.getEntityType(); @@ -189,7 +291,7 @@ public class DefaultEdqsService implements EdqsService { log.trace("[{}][{}] Ignoring update event, type {} not supported", tenantId, entityId, entityType); return; } - onUpdate(tenantId, objectType, edqsConverter.toEntity(entityType, entity)); + onUpdate(tenantId, objectType, DefaultEdqsMapper.toEntity(entityType, entity)); } @Override @@ -216,12 +318,11 @@ public class DefaultEdqsService implements EdqsService { protected void processEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType, EdqsObject object) { executor.submit(() -> { try { - String key = object.key(); + String key = object.stringKey(); Long version = object.version(); EdqsEventMsg.Builder eventMsg = EdqsEventMsg.newBuilder() - .setKey(key) .setObjectType(objectType.name()) - .setData(ByteString.copyFrom(edqsConverter.serialize(objectType, object))) + .setData(ByteString.copyFrom(edqsMapper.serialize(object))) .setEventType(eventType.name()); if (version != null) { eventMsg.setVersion(version); @@ -278,6 +379,7 @@ public class DefaultEdqsService implements EdqsService { @PreDestroy private void stop() { executor.shutdown(); + scheduler.shutdownNow(); eventsProducer.stop(); } @@ -288,11 +390,4 @@ public class DefaultEdqsService implements EdqsService { private EdqsSyncStatus status; } - private enum EdqsSyncStatus { - REQUESTED, - STARTED, - FINISHED, - FAILED - } - } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2909e7cb05..ab64d18500 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1764,6 +1764,8 @@ queue: supported: "${TB_EDQS_API_SUPPORTED:false}" # Whether to auto-enable EDQS API (if queue.edqs.api.supported is true) when sync of data to Kafka is finished auto_enable: "${TB_EDQS_API_AUTO_ENABLE:true}" + # Interval in milliseconds to check for ready EDQS servers + readiness_check_interval: "${TB_EDQS_READINESS_CHECK_INTERVAL_MS:60000}" # Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices) mode: "${TB_EDQS_MODE:local}" local: @@ -1787,6 +1789,10 @@ queue: max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" # Maximum timeout for requests to EDQS max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" + # Thread pool size for EDQS requests executor + request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}" + # Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process. + versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}" # Strings longer than this threshold will be compressed string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}" stats: diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index 153ec2d26f..1bea463159 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -15,21 +15,27 @@ */ package org.thingsboard.server.controller; +import org.assertj.core.api.ThrowingConsumer; import org.junit.Before; +import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.common.data.edqs.EdqsState; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode; +import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataQuery; -import org.thingsboard.server.common.msg.edqs.EdqsApiService; +import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.dao.service.DaoSqlTest; -import org.thingsboard.server.edqs.state.EdqsStateService; import org.thingsboard.server.edqs.util.EdqsRocksDb; +import org.thingsboard.server.queue.discovery.DiscoveryService; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @DaoSqlTest @@ -39,22 +45,23 @@ import static org.awaitility.Awaitility.await; "queue.edqs.sync.enabled=true", "queue.edqs.api.supported=true", "queue.edqs.api.auto_enable=true", - "queue.edqs.mode=local" + "queue.edqs.mode=local", + "queue.edqs.readiness_check_interval=500" }) public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { @Autowired - private EdqsApiService edqsApiService; + private EdqsService edqsService; @Autowired - private EdqsStateService edqsStateService; + private DiscoveryService discoveryService; @MockBean // so that we don't do backup for tests private EdqsRocksDb edqsRocksDb; @Before public void before() { - await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsApiService.isEnabled() && edqsStateService.isReady()); + await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.getState().isApiEnabled()); } @Override @@ -69,4 +76,61 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { result -> result == expectedResult); } + @Test + public void testEdqsState() { + assertThat(edqsService.getState().getApiMode()).isEqualTo(EdqsApiMode.AUTO_ENABLED); + + // notifying EDQS is not ready: API should be auto-disabled + discoveryService.setReady(false); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED); + assertThat(state.getEdqsReady()).isFalse(); + assertThat(state.isApiEnabled()).isFalse(); + }); + + // manually disabling API + edqsService.processSystemRequest(ToCoreEdqsRequest.builder() + .apiEnabled(false) + .build()); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED); + assertThat(state.getEdqsReady()).isFalse(); + assertThat(state.isApiEnabled()).isFalse(); + }); + + // notifying EDQS is ready: API should not be enabled automatically because manually disabled previously + discoveryService.setReady(true); + verifyState(state -> { + assertThat(state.getEdqsReady()).isTrue(); + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED); + assertThat(state.isApiEnabled()).isFalse(); + }); + + // manually enabling API + edqsService.processSystemRequest(ToCoreEdqsRequest.builder() + .apiEnabled(true) + .build()); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.ENABLED); + assertThat(state.getEdqsReady()).isTrue(); + assertThat(state.isApiEnabled()).isTrue(); + }); + + // notifying EDQS is not ready: API should be auto-disabled + discoveryService.setReady(false); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED); + assertThat(state.getEdqsReady()).isFalse(); + assertThat(state.isApiEnabled()).isFalse(); + }); + + discoveryService.setReady(true); + } + + private void verifyState(ThrowingConsumer assertion) { + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(edqsService.getState()).satisfies(assertion); + }); + } + } diff --git a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java index 29a208a805..26c02e8704 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java @@ -22,6 +22,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.ResultActions; import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; import org.thingsboard.common.util.JacksonUtil; @@ -78,6 +79,10 @@ import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @DaoSqlTest +@TestPropertySource(properties = { + "queue.edqs.sync.enabled=true", // only enabling sync + "queue.edqs.api.supported=false", +}) public class EntityQueryControllerTest extends AbstractControllerTest { private static final String CUSTOMER_USER_EMAIL = "entityQueryCustomer@thingsboard.org"; @@ -803,7 +808,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest { //assign dashboard doPost("/api/customer/" + savedCustomer.getId().getId().toString() - + "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class); + + "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class); // check entity data query by customer User customerUser = new User(); diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java index 5264e69bd3..2f244807bd 100644 --- a/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java @@ -33,7 +33,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.RelationsQueryFilter; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter; -import org.thingsboard.server.common.msg.edqs.EdqsApiService; +import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.edqs.util.EdqsRocksDb; @@ -53,19 +53,20 @@ import static org.awaitility.Awaitility.await; "queue.edqs.sync.enabled=true", "queue.edqs.api.supported=true", "queue.edqs.api.auto_enable=true", - "queue.edqs.mode=local" + "queue.edqs.mode=local", + "queue.edqs.readiness_check_interval=1000" }) public class EdqsEntityServiceTest extends EntityServiceTest { @Autowired - private EdqsApiService edqsApiService; + private EdqsService edqsService; @MockBean private EdqsRocksDb edqsRocksDb; @Before public void beforeEach() { - await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsApiService.isEnabled()); + await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled()); } // sql implementation has a bug with data duplication, edqs implementation returns correct value diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java index 51fd735a9d..a374928b16 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java @@ -19,11 +19,10 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; public interface TbQueueProducer { - void init(); - String getDefaultTopic(); void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback); void stop(); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java index c162365257..3ffd6ad424 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java @@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import java.util.UUID; + @Data @AllArgsConstructor @NoArgsConstructor @@ -58,7 +60,7 @@ public class AttributeKv implements EdqsObject { } @Override - public String key() { + public String stringKey() { return "a_" + entityId + "_" + scope + "_" + key; } @@ -72,4 +74,6 @@ public class AttributeKv implements EdqsObject { return ObjectType.ATTRIBUTE_KV; } + public record Key(UUID entityId, AttributeScope scope, int key) implements EdqsObjectKey {} + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java index a74c90208a..f4685f896a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.ObjectType; public interface EdqsObject { @JsonIgnore - String key(); + String stringKey(); @JsonIgnore Long version(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java new file mode 100644 index 0000000000..8d48bbd383 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java @@ -0,0 +1,18 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.edqs; + +public interface EdqsObjectKey {} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java new file mode 100644 index 0000000000..7c3b328022 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.edqs; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.commons.lang3.BooleanUtils; + +@Getter +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class EdqsState { + + private Boolean edqsReady; + @Setter + private EdqsSyncStatus syncStatus; + @Setter + private EdqsApiMode apiMode; + + public boolean setEdqsReady(boolean ready) { + boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.edqsReady, false) != ready; + this.edqsReady = ready; + return changed; + } + + public boolean isApiReady() { + return edqsReady && syncStatus == EdqsSyncStatus.FINISHED; + } + + public boolean isApiEnabled() { + return apiMode != null && (apiMode == EdqsApiMode.ENABLED || apiMode == EdqsApiMode.AUTO_ENABLED); + } + + @Override + public String toString() { + return '[' + + "EDQS ready: " + edqsReady + + ", sync status: " + syncStatus + + ", API mode: " + apiMode + + ']'; + } + + public enum EdqsSyncStatus { + REQUESTED, + STARTED, + FINISHED, + FAILED + } + + public enum EdqsApiMode { + ENABLED, + AUTO_ENABLED, + DISABLED, + AUTO_DISABLED + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java index c22ef147e3..96a5b26aee 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java @@ -51,7 +51,7 @@ public class Entity implements EdqsObject { } @Override - public String key() { + public String stringKey() { return "e_" + fields.getId().toString(); } @@ -65,4 +65,6 @@ public class Entity implements EdqsObject { return ObjectType.fromEntityType(type); } + public record Key(UUID id) implements EdqsObjectKey {} + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java index 8bd69c41a4..12695de423 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java @@ -24,6 +24,8 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import java.util.UUID; + @Data @AllArgsConstructor @NoArgsConstructor @@ -53,7 +55,8 @@ public class LatestTsKv implements EdqsObject { this.version = version != null ? version : 0L; } - public String key() { + @Override + public String stringKey() { return "l_" + entityId + "_" + key; } @@ -67,4 +70,6 @@ public class LatestTsKv implements EdqsObject { return ObjectType.LATEST_TS_KV; } + public record Key(UUID entityId, int key) implements EdqsObjectKey {} + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsMsg.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsMsg.java index 78bebba20a..d581a5d17c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsMsg.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsMsg.java @@ -19,6 +19,7 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus; @Data @AllArgsConstructor @@ -29,4 +30,7 @@ public class ToCoreEdqsMsg { private EdqsSyncRequest syncRequest; private Boolean apiEnabled; + private EdqsSyncStatus syncStatus; + private Boolean edqsReady; + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsRequest.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsRequest.java index c4f262fbf0..44bbfa2cc9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsRequest.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsRequest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.common.data.edqs; -import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -30,9 +29,4 @@ public class ToCoreEdqsRequest { private EdqsSyncRequest syncRequest; private Boolean apiEnabled; - @JsonIgnore - public ToCoreEdqsMsg toInternalMsg() { - return new ToCoreEdqsMsg(syncRequest, apiEnabled); - } - } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java index 1b0975542c..2eae806f43 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java @@ -29,9 +29,7 @@ public interface EntityFields { Logger log = LoggerFactory.getLogger(EntityFields.class); - default UUID getId() { - return null; - } + UUID getId(); default UUID getTenantId() { return null; @@ -147,6 +145,7 @@ public interface EntityFields { default String getAsString(String key) { return switch (key) { + case "id" -> getId().toString(); case "createdTime" -> Long.toString(getCreatedTime()); case "title" -> getName(); case "type" -> getType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java index 002d8c1b82..1830310222 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java @@ -27,10 +27,12 @@ import org.thingsboard.server.common.data.BaseDataWithAdditionalInfo; import org.thingsboard.server.common.data.HasVersion; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.validation.Length; import java.io.Serializable; +import java.util.UUID; @Slf4j @Schema @@ -119,8 +121,8 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject { BaseDataWithAdditionalInfo.setJson(addInfo, json -> this.additionalInfo = json, bytes -> this.additionalInfoBytes = bytes); } - @JsonIgnore - public String key() { + @Override + public String stringKey() { return "r_" + from + "_" + to + "_" + typeGroup + "_" + type; } @@ -134,4 +136,6 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject { return ObjectType.RELATION; } + public record Key(UUID from, UUID to, RelationTypeGroup typeGroup, String type) implements EdqsObjectKey {} + } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 72919c3fc0..9fe6b2843e 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -18,7 +18,6 @@ package org.thingsboard.server.edqs.processor; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Getter; @@ -29,7 +28,6 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ExceptionUtil; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEvent; @@ -47,31 +45,31 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.edqs.repo.EdqsRepository; import org.thingsboard.server.edqs.state.EdqsPartitionService; import org.thingsboard.server.edqs.state.EdqsStateService; -import org.thingsboard.server.edqs.util.EdqsConverter; +import org.thingsboard.server.edqs.util.EdqsMapper; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueHandler; -import org.thingsboard.server.queue.TbQueueResponseTemplate; +import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.edqs.EdqsComponent; import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsConfig.EdqsPartitioningStrategy; +import org.thingsboard.server.queue.edqs.EdqsExecutors; import org.thingsboard.server.queue.edqs.EdqsQueueFactory; -import org.thingsboard.server.queue.util.AfterStartUp; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -85,24 +83,20 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public class EdqsProcessor implements TbQueueHandler, TbProtoQueueMsg> { private final EdqsQueueFactory queueFactory; - private final EdqsConverter converter; + private final EdqsMapper mapper; private final EdqsRepository repository; private final EdqsConfig config; + private final EdqsExecutors edqsExecutors; private final EdqsPartitionService partitionService; + private final DiscoveryService discoveryService; private final TopicService topicService; private final ConfigurableApplicationContext applicationContext; private final EdqsStateService stateService; private PartitionedQueueConsumerManager> eventConsumer; - private TbQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; - - private ExecutorService consumersExecutor; - private ExecutorService taskExecutor; - private ScheduledExecutorService scheduler; + private PartitionedQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; private ListeningExecutorService requestExecutor; - - private final VersionsStore versionsStore = new VersionsStore(); - + private VersionsStore versionsStore; private final AtomicInteger counter = new AtomicInteger(); @Getter @@ -110,18 +104,17 @@ public class EdqsProcessor implements TbQueueHandler, @PostConstruct private void init() { - consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); - taskExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-task-executor"); - scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler"); - requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(12, "edqs-requests")); errorHandler = error -> { if (error instanceof OutOfMemoryError) { log.error("OOM detected, shutting down"); repository.clear(); + discoveryService.setReady(false); Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-shutdown")) .execute(applicationContext::close); } }; + requestExecutor = edqsExecutors.getRequestExecutor(); + versionsStore = new VersionsStore(config.getVersionsCacheTtl()); eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic())) @@ -143,19 +136,14 @@ public class EdqsProcessor implements TbQueueHandler, }) .consumerCreator((config, tpi) -> queueFactory.createEdqsEventsConsumer()) .queueAdmin(queueFactory.getEdqsQueueAdmin()) - .consumerExecutor(consumersExecutor) - .taskExecutor(taskExecutor) - .scheduler(scheduler) + .consumerExecutor(edqsExecutors.getConsumersExecutor()) + .taskExecutor(edqsExecutors.getConsumerTaskExecutor()) + .scheduler(edqsExecutors.getScheduler()) .uncaughtErrorHandler(errorHandler) .build(); - stateService.init(eventConsumer); + responseTemplate = queueFactory.createEdqsResponseTemplate(this); - responseTemplate = queueFactory.createEdqsResponseTemplate(); - } - - @AfterStartUp(order = 1) - public void start() { - responseTemplate.launch(this); + stateService.init(eventConsumer, List.of(responseTemplate.getRequestConsumer())); } @EventListener @@ -165,10 +153,8 @@ public class EdqsProcessor implements TbQueueHandler, } try { Set newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS)); - stateService.process(withTopic(newPartitions, topicService.buildTopicName(config.getStateTopic()))); - // eventsConsumer's partitions are updated by stateService - responseTemplate.subscribe(withTopic(newPartitions, topicService.buildTopicName(config.getRequestsTopic()))); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template + // partitions for event and request consumers are updated by stateService Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); if (CollectionsUtil.isNotEmpty(oldPartitions)) { @@ -243,22 +229,20 @@ public class EdqsProcessor implements TbQueueHandler, TenantId tenantId = getTenantId(edqsMsg); ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); - String key = eventMsg.getKey(); Long version = eventMsg.hasVersion() ? eventMsg.getVersion() : null; + EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), false); if (version != null) { - if (!versionsStore.isNew(key, version)) { + if (!versionsStore.isNew(mapper.getKey(object), version)) { return; } } else if (!ObjectType.unversionedTypes.contains(objectType)) { - log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key); + log.warn("[{}] {} doesn't have version: {}", tenantId, objectType, object); } if (backup) { - stateService.save(tenantId, objectType, key, eventType, edqsMsg); + stateService.save(tenantId, objectType, object.stringKey(), eventType, edqsMsg); } - EdqsObject object = converter.deserialize(objectType, eventMsg.getData().toByteArray()); - log.debug("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version); int count = counter.incrementAndGet(); if (count % 100000 == 0) { log.info("Processed {} events", count); @@ -270,6 +254,7 @@ public class EdqsProcessor implements TbQueueHandler, .eventType(eventType) .object(object) .build(); + log.debug("Processing event: {}", event); repository.processEvent(event); } } @@ -292,11 +277,6 @@ public class EdqsProcessor implements TbQueueHandler, eventConsumer.awaitStop(); responseTemplate.stop(); stateService.stop(); - - consumersExecutor.shutdownNow(); - taskExecutor.shutdownNow(); - scheduler.shutdownNow(); - requestExecutor.shutdownNow(); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/EntityNameQueryProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/EntityNameQueryProcessor.java index ec88db4d0f..fbb36cf2a0 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/EntityNameQueryProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/EntityNameQueryProcessor.java @@ -35,7 +35,7 @@ public class EntityNameQueryProcessor extends AbstractSimpleQueryProcessor> eventConsumer); + void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers); void process(Set partitions); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index ddbdc3253a..0c90235ecd 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -22,11 +22,13 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; +import org.thingsboard.server.common.data.edqs.EdqsObject; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.edqs.processor.EdqsProcessor; import org.thingsboard.server.edqs.processor.EdqsProducer; +import org.thingsboard.server.edqs.util.EdqsMapper; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; @@ -34,16 +36,18 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; -import org.thingsboard.server.queue.common.state.QueueStateService; +import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; +import org.thingsboard.server.queue.edqs.EdqsExecutors; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -60,23 +64,28 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; private final KafkaEdqsQueueFactory queueFactory; + private final DiscoveryService discoveryService; + private final EdqsExecutors edqsExecutors; + private final EdqsMapper mapper; private final TopicService topicService; @Autowired @Lazy private EdqsProcessor edqsProcessor; private PartitionedQueueConsumerManager> stateConsumer; - private QueueStateService, TbProtoQueueMsg> queueStateService; + private KafkaQueueStateService, TbProtoQueueMsg> queueStateService; private QueueConsumerManager> eventsToBackupConsumer; private EdqsProducer stateProducer; + private VersionsStore versionsStore; - private final VersionsStore versionsStore = new VersionsStore(); private final AtomicInteger stateReadCount = new AtomicInteger(); private final AtomicInteger eventsReadCount = new AtomicInteger(); - private Boolean ready; + + private boolean ready = false; @Override - public void init(PartitionedQueueConsumerManager> eventConsumer) { + public void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers) { + versionsStore = new VersionsStore(config.getVersionsCacheTtl()); TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) @@ -98,9 +107,9 @@ public class KafkaEdqsStateService implements EdqsStateService { }) .consumerCreator((config, tpi) -> queueFactory.createEdqsStateConsumer()) .queueAdmin(queueAdmin) - .consumerExecutor(eventConsumer.getConsumerExecutor()) - .taskExecutor(eventConsumer.getTaskExecutor()) - .scheduler(eventConsumer.getScheduler()) + .consumerExecutor(edqsExecutors.getConsumersExecutor()) + .taskExecutor(edqsExecutors.getConsumerTaskExecutor()) + .scheduler(edqsExecutors.getScheduler()) .uncaughtErrorHandler(edqsProcessor.getErrorHandler()) .build(); @@ -119,22 +128,25 @@ public class KafkaEdqsStateService implements EdqsStateService { if (msg.hasEventMsg()) { EdqsEventMsg eventMsg = msg.getEventMsg(); - String key = eventMsg.getKey(); - int count = eventsReadCount.incrementAndGet(); - if (count % 100000 == 0) { - log.info("[events-to-backup] Processed {} msgs", count); - } + ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); + EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), true); + if (eventMsg.hasVersion()) { - if (!versionsStore.isNew(key, eventMsg.getVersion())) { + if (!versionsStore.isNew(mapper.getKey(object), eventMsg.getVersion())) { continue; } } TenantId tenantId = getTenantId(msg); - ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); + String key = object.stringKey(); log.trace("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); - stateProducer.send(tenantId, objectType, key, msg); + stateProducer.send(tenantId, objectType, object.stringKey(), msg); + + int count = eventsReadCount.incrementAndGet(); + if (count % 100000 == 0) { + log.info("[events-to-backup] Processed {} msgs", count); + } } } catch (Throwable t) { log.error("Failed to process message: {}", queueMsg, t); @@ -143,7 +155,7 @@ public class KafkaEdqsStateService implements EdqsStateService { consumer.commit(); }) .consumerCreator(() -> eventsToBackupKafkaConsumer) - .consumerExecutor(eventConsumer.getConsumerExecutor()) + .consumerExecutor(edqsExecutors.getConsumersExecutor()) .threadPrefix("edqs-events-to-backup") .build(); @@ -155,6 +167,7 @@ public class KafkaEdqsStateService implements EdqsStateService { queueStateService = KafkaQueueStateService., TbProtoQueueMsg>builder() .eventConsumer(eventConsumer) .stateConsumer(stateConsumer) + .otherConsumers(otherConsumers) .eventsStartOffsetsProvider(() -> { // taking start offsets for events topics from the events-to-backup consumer group, // since eventConsumer doesn't use consumer group management and thus offset tracking @@ -185,7 +198,10 @@ public class KafkaEdqsStateService implements EdqsStateService { eventsToBackupConsumer.subscribe(allPartitions); eventsToBackupConsumer.launch(); } - queueStateService.update(new QueueKey(ServiceType.EDQS), partitions); + queueStateService.update(new QueueKey(ServiceType.EDQS), partitions, () -> { + ready = true; + discoveryService.setReady(true); + }); } @Override @@ -195,13 +211,7 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public boolean isReady() { - if (ready == null) { - Set partitionsInProgress = queueStateService.getPartitionsInProgress(); - if (partitionsInProgress != null && partitionsInProgress.isEmpty()) { - ready = true; // once true - always true, not to change readiness status on each repartitioning - } - } - return ready != null && ready; + return ready; } private TenantId getTenantId(ToEdqsMsg edqsMsg) { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index cde21edfaf..00bed5e91d 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -29,8 +29,10 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent; +import java.util.List; import java.util.Set; import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; @@ -42,20 +44,24 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public class LocalEdqsStateService implements EdqsStateService { private final EdqsRocksDb db; + private final DiscoveryService discoveryService; @Autowired @Lazy private EdqsProcessor processor; private PartitionedQueueConsumerManager> eventConsumer; - private Set partitions; + private List> otherConsumers; + + private boolean ready = false; @Override - public void init(PartitionedQueueConsumerManager> eventConsumer) { + public void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers) { this.eventConsumer = eventConsumer; + this.otherConsumers = otherConsumers; } @Override public void process(Set partitions) { - if (this.partitions == null) { + if (!ready) { db.forEach((key, value) -> { try { ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value); @@ -67,8 +73,13 @@ public class LocalEdqsStateService implements EdqsStateService { }); log.info("Restore completed"); } + ready = true; + discoveryService.setReady(true); + eventConsumer.update(withTopic(partitions, eventConsumer.getTopic())); - this.partitions = partitions; + for (PartitionedQueueConsumerManager consumer : otherConsumers) { + consumer.update(withTopic(partitions, consumer.getTopic())); + } } @Override @@ -87,7 +98,7 @@ public class LocalEdqsStateService implements EdqsStateService { @Override public boolean isReady() { - return partitions != null; + return ready; } @Override diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java similarity index 80% rename from common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java rename to common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java index 9a84436f36..0d3fc35b08 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.protobuf.ByteString; import lombok.RequiredArgsConstructor; @@ -36,6 +35,7 @@ import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.AttributeKv; import org.thingsboard.server.common.data.edqs.DataPoint; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; import org.thingsboard.server.common.data.edqs.Entity; import org.thingsboard.server.common.data.edqs.LatestTsKv; import org.thingsboard.server.common.data.edqs.fields.FieldsUtil; @@ -52,6 +52,7 @@ import org.thingsboard.server.edqs.data.dp.DoubleDataPoint; import org.thingsboard.server.edqs.data.dp.JsonDataPoint; import org.thingsboard.server.edqs.data.dp.LongDataPoint; import org.thingsboard.server.edqs.data.dp.StringDataPoint; +import org.thingsboard.server.edqs.repo.KeyDictionary; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto; import org.xerial.snappy.Snappy; @@ -65,19 +66,29 @@ import java.util.UUID; @Service @RequiredArgsConstructor @Slf4j -public class EdqsConverter { +public class DefaultEdqsMapper implements EdqsMapper { private final EdqsStatsService edqsStatsService; @Value("${queue.edqs.string_compression_length_threshold:512}") private int stringCompressionLengthThreshold; - private final Map> converters = new HashMap<>(); - private final Converter defaultConverter = new JsonConverter<>(Entity.class); + private final Map> mappers = new HashMap<>(); + private final Mapper defaultMapper = new JsonMapper<>(Entity.class) { + @Override + public EdqsObjectKey getKey(Entity entity) { + return new Entity.Key(entity.getFields().getId()); + } + }; { - converters.put(ObjectType.RELATION, new JsonConverter<>(EntityRelation.class)); - converters.put(ObjectType.ATTRIBUTE_KV, new Converter() { + mappers.put(ObjectType.RELATION, new JsonMapper<>(EntityRelation.class) { + @Override + public EdqsObjectKey getKey(EntityRelation relation) { + return new EntityRelation.Key(relation.getFrom().getId(), relation.getTo().getId(), relation.getTypeGroup(), relation.getType()); + } + }); + mappers.put(ObjectType.ATTRIBUTE_KV, new Mapper() { @Override public byte[] serialize(ObjectType type, AttributeKv attributeKv) { var proto = TransportProtos.AttributeKvProto.newBuilder() @@ -94,12 +105,12 @@ public class EdqsConverter { } @Override - public AttributeKv deserialize(ObjectType type, byte[] bytes) throws Exception { + public AttributeKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception { TransportProtos.AttributeKvProto proto = TransportProtos.AttributeKvProto.parseFrom(bytes); EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); AttributeScope scope = AttributeScope.values()[proto.getScope().getNumber()]; - DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null; + DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint()); return AttributeKv.builder() .entityId(entityId) .scope(scope) @@ -108,8 +119,13 @@ public class EdqsConverter { .dataPoint(dataPoint) .build(); } + + @Override + public EdqsObjectKey getKey(AttributeKv attributeKv) { + return new AttributeKv.Key(attributeKv.getEntityId().getId(), attributeKv.getScope(), KeyDictionary.get(attributeKv.getKey())); + } }); - converters.put(ObjectType.LATEST_TS_KV, new Converter() { + mappers.put(ObjectType.LATEST_TS_KV, new Mapper() { @Override public byte[] serialize(ObjectType type, LatestTsKv latestTsKv) { var proto = TransportProtos.LatestTsKvProto.newBuilder() @@ -125,11 +141,11 @@ public class EdqsConverter { } @Override - public LatestTsKv deserialize(ObjectType type, byte[] bytes) throws Exception { + public LatestTsKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception { TransportProtos.LatestTsKvProto proto = TransportProtos.LatestTsKvProto.parseFrom(bytes); EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null; + DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint()); return LatestTsKv.builder() .entityId(entityId) .key(proto.getKey()) @@ -137,6 +153,11 @@ public class EdqsConverter { .dataPoint(dataPoint) .build(); } + + @Override + public EdqsObjectKey getKey(LatestTsKv latestTsKv) { + return new LatestTsKv.Key(latestTsKv.getEntityId().getId(), KeyDictionary.get(latestTsKv.getKey())); + } }); } @@ -223,30 +244,30 @@ public class EdqsConverter { @SuppressWarnings("unchecked") @SneakyThrows - public byte[] serialize(ObjectType type, T value) { - Converter converter = (Converter) converters.get(type); - if (converter != null) { - return converter.serialize(type, value); - } else { - return defaultConverter.serialize(type, (Entity) value); - } + public byte[] serialize(T value) { + ObjectType type = value.type(); + Mapper mapper = (Mapper) mappers.getOrDefault(type, defaultMapper); + return mapper.serialize(type, value); } @SneakyThrows - public EdqsObject deserialize(ObjectType type, byte[] bytes) { - Converter converter = converters.get(type); - if (converter != null) { - return converter.deserialize(type, bytes); - } else { - return defaultConverter.deserialize(type, bytes); - } + public EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey) { + Mapper mapper = mappers.getOrDefault(type, defaultMapper); + return mapper.deserialize(type, bytes, onlyKey); + } + + @SuppressWarnings("unchecked") + @SneakyThrows + public EdqsObjectKey getKey(T object) { + Mapper mapper = (Mapper) mappers.getOrDefault(object.type(), defaultMapper); + return mapper.getKey(object); } @RequiredArgsConstructor - private static class JsonConverter implements Converter { + private static abstract class JsonMapper implements Mapper { private static final SimpleModule module = new SimpleModule(); - private static final ObjectMapper mapper = JsonMapper.builder() + private static final ObjectMapper mapper = com.fasterxml.jackson.databind.json.JsonMapper.builder() .visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .visibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE) .visibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE) @@ -267,17 +288,19 @@ public class EdqsConverter { @SneakyThrows @Override - public T deserialize(ObjectType objectType, byte[] bytes) { + public T deserialize(ObjectType objectType, byte[] bytes, boolean onlyKey) { return mapper.readValue(bytes, this.type); } } - private interface Converter { + private interface Mapper { byte[] serialize(ObjectType type, T value) throws Exception; - T deserialize(ObjectType type, byte[] bytes) throws Exception; + T deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception; + + EdqsObjectKey getKey(T object); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java new file mode 100644 index 0000000000..bd6ed22cd1 --- /dev/null +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.edqs.util; + +import org.thingsboard.server.common.data.ObjectType; +import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; + +public interface EdqsMapper { + + byte[] serialize(T value); + + EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey); + + EdqsObjectKey getKey(T object); + +} diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java index 798ac0603d..ba3263eec2 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java @@ -18,6 +18,7 @@ package org.thingsboard.server.edqs.util; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,18 +26,22 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public class VersionsStore { - private final Cache versions = Caffeine.newBuilder() - .expireAfterWrite(24, TimeUnit.HOURS) - .build(); + private final Cache versions; - public boolean isNew(String key, Long version) { + public VersionsStore(int ttlMinutes) { + this.versions = Caffeine.newBuilder() + .expireAfterWrite(ttlMinutes, TimeUnit.MINUTES) + .build(); + } + + public boolean isNew(EdqsObjectKey key, Long version) { AtomicBoolean isNew = new AtomicBoolean(false); versions.asMap().compute(key, (k, prevVersion) -> { if (prevVersion == null || prevVersion <= version) { isNew.set(true); return version; } else { - log.info("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion); + log.debug("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion); return prevVersion; } }); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsApiService.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsApiService.java index 05864fe863..0fe9971712 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsApiService.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsApiService.java @@ -25,12 +25,6 @@ public interface EdqsApiService { ListenableFuture processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request); - boolean isEnabled(); - - void setEnabled(boolean enabled); - boolean isSupported(); - boolean isAutoEnable(); - } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsService.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsService.java index 32ff57e3e0..faad9a0f4a 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsService.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.msg.edqs; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsState; import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg; import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; import org.thingsboard.server.common.data.id.EntityId; @@ -36,4 +37,8 @@ public interface EdqsService { void processSystemMsg(ToCoreEdqsMsg request); + boolean isApiEnabled(); + + EdqsState getState(); + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 2a97fd35d0..65694f2fb6 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -89,6 +89,7 @@ message ServiceInfo { SystemInfoProto systemInfo = 10; repeated string assignedTenantProfiles = 11; string label = 12; + bool ready = 13; } message SystemInfoProto { @@ -1836,7 +1837,6 @@ message FromEdqsMsg { } message EdqsEventMsg { - string key = 1; string objectType = 2; bytes data = 3; string eventType = 4; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java index 1beb505595..c6f76a5000 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java @@ -91,7 +91,6 @@ public class DefaultTbQueueRequestTemplate requestTemplate; private final TbQueueProducer responseTemplate; - private final ConcurrentMap pendingRequests; private final ExecutorService loopExecutor; private final ScheduledExecutorService timeoutExecutor; private final ExecutorService callbackExecutor; @@ -67,7 +64,6 @@ public class DefaultTbQueueResponseTemplate(); this.maxPendingRequests = maxPendingRequests; this.pollInterval = pollInterval; this.requestTimeout = requestTimeout; @@ -89,7 +85,6 @@ public class DefaultTbQueueResponseTemplate handler) { - this.responseTemplate.init(); loopExecutor.submit(() -> { while (!stopped) { try { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java new file mode 100644 index 0000000000..d1e1be708f --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java @@ -0,0 +1,164 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.stats.MessagesStats; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueHandler; +import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +@Slf4j +public class PartitionedQueueResponseTemplate extends AbstractTbQueueTemplate { + + @Getter + private final PartitionedQueueConsumerManager requestConsumer; + private final TbQueueProducer responseProducer; + + private final TbQueueHandler handler; + private final long pollInterval; + private final int maxPendingRequests; + private final long requestTimeout; + private final MessagesStats stats; + + private final ScheduledExecutorService scheduler; + private final ExecutorService callbackExecutor; + + private final AtomicInteger pendingRequestCount = new AtomicInteger(); + + @Builder + public PartitionedQueueResponseTemplate(String key, + TbQueueHandler handler, + String requestsTopic, + Function> consumerCreator, + TbQueueProducer responseProducer, + long pollInterval, + long requestTimeout, + int maxPendingRequests, + ExecutorService consumerExecutor, + ExecutorService callbackExecutor, + ExecutorService consumerTaskExecutor, + MessagesStats stats) { + this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(key + "-queue-response-template-scheduler"); + this.callbackExecutor = callbackExecutor; + this.handler = handler; + this.requestConsumer = PartitionedQueueConsumerManager.create() + .queueKey(key + "-requests") + .topic(requestsTopic) + .pollInterval(pollInterval) + .msgPackProcessor((requests, consumer, config) -> processRequests(requests, consumer)) + .consumerCreator((config, tpi) -> consumerCreator.apply(tpi)) + .consumerExecutor(consumerExecutor) + .scheduler(scheduler) + .taskExecutor(consumerTaskExecutor) + .build(); + this.responseProducer = responseProducer; + this.pollInterval = pollInterval; + this.maxPendingRequests = maxPendingRequests; + this.requestTimeout = requestTimeout; + this.stats = stats; + } + + private void processRequests(List requests, TbQueueConsumer consumer) { + while (pendingRequestCount.get() >= maxPendingRequests) { + try { + Thread.sleep(pollInterval); + } catch (InterruptedException e) { + log.trace("Failed to wait until the server has capacity to handle new requests", e); + } + } + + requests.forEach(request -> { + long currentTime = System.currentTimeMillis(); + long expireTs = bytesToLong(request.getHeaders().get(EXPIRE_TS_HEADER)); + if (expireTs >= currentTime) { + byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER); + if (requestIdHeader == null) { + log.error("[{}] Missing requestId in header", request); + return; + } + byte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER); + if (responseTopicHeader == null) { + log.error("[{}] Missing response topic in header", request); + return; + } + UUID requestId = bytesToUuid(requestIdHeader); + String responseTopic = bytesToString(responseTopicHeader); + try { + pendingRequestCount.getAndIncrement(); + stats.incrementTotal(); + AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request), + response -> { + pendingRequestCount.decrementAndGet(); + response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId)); + responseProducer.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null); + stats.incrementSuccessful(); + }, + e -> { + pendingRequestCount.decrementAndGet(); + if (e.getCause() != null && e.getCause() instanceof TimeoutException) { + log.warn("[{}] Timeout to process the request: {}", requestId, request, e); + } else { + log.trace("[{}] Failed to process the request: {}", requestId, request, e); + } + stats.incrementFailed(); + }, + requestTimeout, + scheduler, + callbackExecutor); + } catch (Throwable e) { + pendingRequestCount.decrementAndGet(); + log.warn("[{}] Failed to process the request: {}", requestId, request, e); + stats.incrementFailed(); + } + } + }); + consumer.commit(); + } + + public void subscribe(Set partitions) { + requestConsumer.update(partitions); + } + + public void stop() { + if (requestConsumer != null) { + requestConsumer.stop(); + requestConsumer.awaitStop(); + } + if (responseProducer != null) { + responseProducer.stop(); + } + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 2233855a37..86e0721dd8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -25,7 +25,6 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask; -import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.Collection; @@ -50,7 +49,7 @@ import java.util.function.Function; public class MainQueueConsumerManager { @Getter - protected final QueueKey queueKey; + protected final Object queueKey; @Getter protected C config; protected final MsgPackProcessor msgPackProcessor; @@ -72,7 +71,7 @@ public class MainQueueConsumerManager msgPackProcessor, BiFunction> consumerCreator, ExecutorService consumerExecutor, diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index 1b19fcab0e..42d5b6bb18 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -26,7 +26,6 @@ import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.AddPartitionsTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeletePartitionsTask; import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.RemovePartitionsTask; -import org.thingsboard.server.queue.discovery.QueueKey; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -44,7 +43,7 @@ public class PartitionedQueueConsumerManager extends MainQ private final String topic; @Builder(builderMethodName = "create") // not to conflict with super.builder() - public PartitionedQueueConsumerManager(QueueKey queueKey, String topic, long pollInterval, MsgPackProcessor msgPackProcessor, + public PartitionedQueueConsumerManager(Object queueKey, String topic, long pollInterval, MsgPackProcessor msgPackProcessor, BiFunction> consumerCreator, TbQueueAdmin queueAdmin, ExecutorService consumerExecutor, ScheduledExecutorService scheduler, ExecutorService taskExecutor, Consumer uncaughtErrorHandler) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java index be019caaa7..be379fb76d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java @@ -18,10 +18,12 @@ package org.thingsboard.server.queue.common.state; import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; +import java.util.Collections; + public class DefaultQueueStateService extends QueueStateService { public DefaultQueueStateService(PartitionedQueueConsumerManager eventConsumer) { - super(eventConsumer); + super(eventConsumer, Collections.emptyList()); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java index bf02afe86c..2a38c9a86c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java @@ -22,8 +22,11 @@ import org.thingsboard.server.queue.TbQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; @@ -34,17 +37,20 @@ public class KafkaQueueStateService private final PartitionedQueueConsumerManager stateConsumer; private final Supplier> eventsStartOffsetsProvider; + private final Set partitionsInProgress = ConcurrentHashMap.newKeySet(); + @Builder public KafkaQueueStateService(PartitionedQueueConsumerManager eventConsumer, PartitionedQueueConsumerManager stateConsumer, + List> otherConsumers, Supplier> eventsStartOffsetsProvider) { - super(eventConsumer); + super(eventConsumer, otherConsumers != null ? otherConsumers : Collections.emptyList()); this.stateConsumer = stateConsumer; this.eventsStartOffsetsProvider = eventsStartOffsetsProvider; } @Override - protected void addPartitions(QueueKey queueKey, Set partitions) { + protected void addPartitions(QueueKey queueKey, Set partitions, Runnable whenAllProcessed) { Map eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states Set statePartitions = withTopic(partitions, stateConsumer.getTopic()); @@ -57,11 +63,17 @@ public class KafkaQueueStateService log.info("Finished partition {} (still in progress: {})", statePartition, partitionsInProgress); if (partitionsInProgress.isEmpty()) { log.info("All partitions processed"); + if (whenAllProcessed != null) { + whenAllProcessed.run(); + } } TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic()); if (this.partitions.get(queueKey).contains(eventPartition)) { eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null); + for (PartitionedQueueConsumerManager consumer : otherConsumers) { + consumer.addPartitions(Set.of(statePartition.withTopic(consumer.getTopic()))); + } } } finally { readLock.unlock(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java index 29426fab63..e58d5eb036 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java @@ -25,9 +25,9 @@ import org.thingsboard.server.queue.discovery.QueueKey; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -37,19 +37,19 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public abstract class QueueStateService { protected final PartitionedQueueConsumerManager eventConsumer; + protected final List> otherConsumers; @Getter protected final Map> partitions = new HashMap<>(); - protected final Set partitionsInProgress = ConcurrentHashMap.newKeySet(); - protected boolean initialized; protected final ReadWriteLock partitionsLock = new ReentrantReadWriteLock(); - protected QueueStateService(PartitionedQueueConsumerManager eventConsumer) { + protected QueueStateService(PartitionedQueueConsumerManager eventConsumer, List> otherConsumers) { this.eventConsumer = eventConsumer; + this.otherConsumers = otherConsumers; } - public void update(QueueKey queueKey, Set newPartitions) { + public void update(QueueKey queueKey, Set newPartitions, Runnable whenAllProcessed) { newPartitions = withTopic(newPartitions, eventConsumer.getTopic()); var writeLock = partitionsLock.writeLock(); writeLock.lock(); @@ -71,17 +71,29 @@ public abstract class QueueStateService partitions) { + protected void addPartitions(QueueKey queueKey, Set partitions, Runnable whenAllProcessed) { + if (whenAllProcessed != null) { + whenAllProcessed.run(); + } eventConsumer.addPartitions(partitions); + for (PartitionedQueueConsumerManager consumer : otherConsumers) { + consumer.addPartitions(withTopic(partitions, consumer.getTopic())); + } } protected void removePartitions(QueueKey queueKey, Set partitions) { eventConsumer.removePartitions(partitions); + for (PartitionedQueueConsumerManager consumer : otherConsumers) { + consumer.removePartitions(withTopic(partitions, consumer.getTopic())); + } } public void delete(Set partitions) { @@ -100,10 +112,9 @@ public abstract class QueueStateService partitions) { eventConsumer.delete(withTopic(partitions, eventConsumer.getTopic())); - } - - public Set getPartitionsInProgress() { - return initialized ? partitionsInProgress : null; + for (PartitionedQueueConsumerManager consumer : otherConsumers) { + consumer.removePartitions(withTopic(partitions, consumer.getTopic())); + } } public void stop() { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 609d3f8eee..c2705cb1ab 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -68,6 +68,8 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { private List serviceTypes; private ServiceInfo serviceInfo; + private boolean ready = true; + @PostConstruct public void init() { if (StringUtils.isEmpty(serviceId)) { @@ -87,6 +89,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { assignedTenantProfiles = Collections.emptySet(); } if (serviceTypes.contains(ServiceType.EDQS)) { + ready = false; if (StringUtils.isBlank(edqsConfig.getLabel())) { edqsConfig.setLabel(serviceId); } @@ -128,9 +131,17 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList())); } builder.setLabel(edqsConfig.getLabel()); + builder.setReady(ready); return serviceInfo = builder.build(); } + @Override + public boolean setReady(boolean ready) { + boolean changed = this.ready != ready; + this.ready = ready; + return changed; + } + private TransportProtos.SystemInfoProto getCurrentSystemInfoProto() { TransportProtos.SystemInfoProto.Builder builder = TransportProtos.SystemInfoProto.newBuilder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DiscoveryService.java index 5d309014dd..28d61bde0f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DiscoveryService.java @@ -25,4 +25,6 @@ public interface DiscoveryService { boolean isMonolith(); + void setReady(boolean ready); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DummyDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DummyDiscoveryService.java index 442b845e81..8a4e9102e2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DummyDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DummyDiscoveryService.java @@ -55,4 +55,13 @@ public class DummyDiscoveryService implements DiscoveryService { public boolean isMonolith() { return true; } + + @Override + public void setReady(boolean ready) { + boolean changed = serviceInfoProvider.setReady(ready); + if (changed) { + serviceInfoProvider.generateNewServiceInfoWithCurrentSystemInfo(); + } + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java index 51a6d808dd..d7182ca2eb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java @@ -35,4 +35,6 @@ public interface TbServiceInfoProvider { Set getAssignedTenantProfiles(); + boolean setReady(boolean ready); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index cf9f27ee39..d73f531b5b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -177,6 +177,19 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } } + @Override + public void setReady(boolean ready) { + log.debug("Marking current service as {}", ready ? "ready" : "NOT ready"); + boolean changed = serviceInfoProvider.setReady(ready); + if (changed) { + try { + publishCurrentServer(); + } catch (Exception e) { + log.error("Failed to update server readiness status", e); + } + } + } + private boolean currentServerExists() { if (nodePath == null) { return false; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java index 3c927f135b..18f46bd358 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java @@ -44,6 +44,10 @@ public class EdqsConfig { private int maxPendingRequests; @Value("${queue.edqs.max_request_timeout:20000}") private int maxRequestTimeout; + @Value("${queue.edqs.request_executor_size:50}") + private int requestExecutorSize; + @Value("${queue.edqs.versions_cache_ttl:60}") + private int versionsCacheTtl; public String getLabel() { if (partitioningStrategy == EdqsPartitioningStrategy.NONE) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsExecutors.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsExecutors.java new file mode 100644 index 0000000000..8e804410fe --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsExecutors.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.edqs; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.common.util.ThingsBoardThreadFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@Lazy +@Component +@Getter +@RequiredArgsConstructor +public class EdqsExecutors { + + private final EdqsConfig edqsConfig; + + private ExecutorService consumersExecutor; + private ExecutorService consumerTaskExecutor; + private ScheduledExecutorService scheduler; + private ListeningExecutorService requestExecutor; + + @PostConstruct + private void init() { + consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); + consumerTaskExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-task-executor"); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler"); + requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(edqsConfig.getRequestExecutorSize(), "edqs-requests")); + } + + @PreDestroy + private void destroy() { + if (consumersExecutor != null) { + consumersExecutor.shutdownNow(); + } + if (consumerTaskExecutor != null) { + consumerTaskExecutor.shutdownNow(); + } + if (scheduler != null) { + scheduler.shutdownNow(); + } + if (requestExecutor != null) { + requestExecutor.shutdownNow(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java index 5c0d68779a..ed021e3380 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java @@ -19,8 +19,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.TbQueueResponseTemplate; +import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; public interface EdqsQueueFactory { @@ -33,7 +34,7 @@ public interface EdqsQueueFactory { TbQueueProducer> createEdqsStateProducer(); - TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(); + PartitionedQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(TbQueueHandler, TbProtoQueueMsg> handler); TbQueueAdmin getEdqsQueueAdmin(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java index 8c670e66c0..6c7618f7db 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java @@ -17,16 +17,15 @@ package org.thingsboard.server.queue.edqs; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.TbQueueResponseTemplate; -import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate; +import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; @@ -39,6 +38,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { private final InMemoryStorage storage; private final EdqsConfig edqsConfig; + private final EdqsExecutors edqsExecutors; private final StatsFactory statsFactory; private final TbQueueAdmin queueAdmin; @@ -63,17 +63,21 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory { } @Override - public TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate() { - TbQueueConsumer> requestConsumer = new InMemoryTbQueueConsumer<>(storage, edqsConfig.getRequestsTopic()); + public PartitionedQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(TbQueueHandler, TbProtoQueueMsg> handler) { TbQueueProducer> responseProducer = new InMemoryTbQueueProducer<>(storage, edqsConfig.getResponsesTopic()); - return DefaultTbQueueResponseTemplate., TbProtoQueueMsg>builder() - .requestTemplate(requestConsumer) - .responseTemplate(responseProducer) - .maxPendingRequests(edqsConfig.getMaxPendingRequests()) - .requestTimeout(edqsConfig.getMaxRequestTimeout()) + return PartitionedQueueResponseTemplate., TbProtoQueueMsg>builder() + .key("edqs") + .handler(handler) + .requestsTopic(edqsConfig.getRequestsTopic()) + .consumerCreator(tpi -> new InMemoryTbQueueConsumer<>(storage, edqsConfig.getRequestsTopic())) + .responseProducer(responseProducer) .pollInterval(edqsConfig.getPollInterval()) + .requestTimeout(edqsConfig.getMaxRequestTimeout()) + .maxPendingRequests(edqsConfig.getMaxPendingRequests()) + .consumerExecutor(edqsExecutors.getConsumersExecutor()) + .callbackExecutor(edqsExecutors.getRequestExecutor()) + .consumerTaskExecutor(edqsExecutors.getConsumerTaskExecutor()) .stats(statsFactory.createMessagesStats(StatsType.EDQS.getName())) - .executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs")) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java index ab88943b10..f071c30942 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java @@ -16,14 +16,13 @@ package org.thingsboard.server.queue.edqs; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; +import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.TbQueueResponseTemplate; -import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate; +import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; @@ -45,6 +44,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { private final TbKafkaAdmin edqsRequestsAdmin; private final TbKafkaAdmin edqsStateAdmin; private final EdqsConfig edqsConfig; + private final EdqsExecutors edqsExecutors; private final TbServiceInfoProvider serviceInfoProvider; private final TbKafkaConsumerStatsService consumerStatsService; private final TopicService topicService; @@ -53,7 +53,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { private final AtomicInteger consumerCounter = new AtomicInteger(); public KafkaEdqsQueueFactory(TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs topicConfigs, - EdqsConfig edqsConfig, TbServiceInfoProvider serviceInfoProvider, + EdqsConfig edqsConfig, EdqsExecutors edqsExecutors, TbServiceInfoProvider serviceInfoProvider, TbKafkaConsumerStatsService consumerStatsService, TopicService topicService, StatsFactory statsFactory) { this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsEventsConfigs()); @@ -61,6 +61,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { this.edqsStateAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsStateConfigs()); this.kafkaSettings = kafkaSettings; this.edqsConfig = edqsConfig; + this.edqsExecutors = edqsExecutors; this.serviceInfoProvider = serviceInfoProvider; this.consumerStatsService = consumerStatsService; this.topicService = topicService; @@ -116,25 +117,29 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory { } @Override - public TbQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate() { - var requestConsumer = createEdqsMsgConsumer(edqsConfig.getRequestsTopic(), - "edqs-requests-consumer-" + serviceInfoProvider.getServiceId(), - "edqs-requests-consumer-group", - false, edqsRequestsAdmin); + public PartitionedQueueResponseTemplate, TbProtoQueueMsg> createEdqsResponseTemplate(TbQueueHandler, TbProtoQueueMsg> handler) { var responseProducer = TbKafkaProducerTemplate.>builder() .settings(kafkaSettings) .clientId("edqs-response-producer-" + serviceInfoProvider.getServiceId()) .defaultTopic(topicService.buildTopicName(edqsConfig.getResponsesTopic())) .admin(edqsRequestsAdmin) .build(); - return DefaultTbQueueResponseTemplate., TbProtoQueueMsg>builder() - .requestTemplate(requestConsumer) - .responseTemplate(responseProducer) - .maxPendingRequests(edqsConfig.getMaxPendingRequests()) - .requestTimeout(edqsConfig.getMaxRequestTimeout()) + return PartitionedQueueResponseTemplate., TbProtoQueueMsg>builder() + .key("edqs") + .handler(handler) + .requestsTopic(topicService.buildTopicName(edqsConfig.getRequestsTopic())) + .consumerCreator(tpi -> createEdqsMsgConsumer(edqsConfig.getRequestsTopic(), + "edqs-requests-consumer-" + serviceInfoProvider.getServiceId() + "-" + tpi.getPartition().orElse(999), + "edqs-requests-consumer-group", + false, edqsRequestsAdmin)) + .responseProducer(responseProducer) .pollInterval(edqsConfig.getPollInterval()) + .requestTimeout(edqsConfig.getMaxRequestTimeout()) + .maxPendingRequests(edqsConfig.getMaxPendingRequests()) + .consumerExecutor(edqsExecutors.getConsumersExecutor()) + .callbackExecutor(edqsExecutors.getRequestExecutor()) + .consumerTaskExecutor(edqsExecutors.getConsumerTaskExecutor()) .stats(statsFactory.createMessagesStats(StatsType.EDQS.getName())) - .executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs")) .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java index 9d1088e188..7a9c01b72f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java @@ -93,8 +93,7 @@ public class TbKafkaConsumerStatsService { log.info("[{}] Topic partitions with lag: [{}].", groupId, builder.toString()); } } catch (Exception e) { - log.warn("[{}] Failed to get consumer group stats. Reason - {}.", groupId, e.getMessage()); - log.trace("Detailed error: ", e); + log.warn("[{}] Failed to get consumer group stats", groupId, e); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java index cac6f2ea1e..f0b3694d74 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java @@ -75,10 +75,6 @@ public class TbKafkaProducerTemplate implements TbQueuePro topics = ConcurrentHashMap.newKeySet(); } - @Override - public void init() { - } - void addAnalyticHeaders(List
headers) { headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8))); headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8))); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java index 49c52ed21a..43d820bc7e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java @@ -33,11 +33,6 @@ public class InMemoryTbQueueProducer implements TbQueuePro this.defaultTopic = defaultTopic; } - @Override - public void init() { - - } - @Override public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { boolean result = storage.put(tpi.getFullTopicName(), msg); diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplateTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplateTest.java index 9e493220ca..e5bb62bc04 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplateTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplateTest.java @@ -37,11 +37,23 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.BDDMockito.*; -import static org.mockito.Mockito.*; +import static org.mockito.BDDMockito.RETURNS_DEEP_STUBS; +import static org.mockito.BDDMockito.atLeastOnce; +import static org.mockito.BDDMockito.lenient; +import static org.mockito.BDDMockito.mock; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.times; +import static org.mockito.BDDMockito.verify; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willDoNothing; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.hamcrest.MockitoHamcrest.longThat; @Slf4j @@ -96,7 +108,6 @@ public class DefaultTbQueueRequestTemplateTest { inst.init(); assertThat(inst.nextCleanupNs, equalTo(0L)); verify(queueAdmin, times(1)).createTopicIfNotExists(topic); - verify(requestTemplate, times(1)).init(); verify(responseTemplate, times(1)).subscribe(); verify(executorMock, times(1)).submit(any(Runnable.class)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java index be7cbf7f84..84f972dc02 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java @@ -47,6 +47,7 @@ import org.thingsboard.server.common.data.query.EntityTypeFilter; import org.thingsboard.server.common.data.query.KeyFilter; import org.thingsboard.server.common.data.query.RelationsQueryFilter; import org.thingsboard.server.common.msg.edqs.EdqsApiService; +import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.common.stats.EdqsStatsService; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -88,6 +89,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe @Lazy EntityServiceRegistry entityServiceRegistry; + @Autowired + private EdqsService edqsService; + @Autowired @Lazy private EdqsApiService edqsApiService; @@ -104,7 +108,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe long startNs = System.nanoTime(); Long result; - if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) { + if (edqsService.isApiEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) { EdqsRequest request = EdqsRequest.builder() .entityCountQuery(query) .build(); @@ -126,7 +130,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe long startNs = System.nanoTime(); PageData result; - if (edqsApiService.isEnabled() && validForEdqs(query)) { + if (edqsService.isApiEnabled() && validForEdqs(query)) { EdqsRequest request = EdqsRequest.builder() .entityDataQuery(query) .build(); @@ -295,7 +299,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe } if ((query.getEntityFields() == null || query.getEntityFields().isEmpty()) && - (query.getLatestValues() == null || query.getLatestValues().isEmpty())) { + (query.getLatestValues() == null || query.getLatestValues().isEmpty())) { return false; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsApiService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsApiService.java index e486d3b645..f50bb9a68c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsApiService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsApiService.java @@ -35,24 +35,9 @@ public class DummyEdqsApiService implements EdqsApiService { throw new UnsupportedOperationException(); } - @Override - public boolean isEnabled() { - return false; - } - - @Override - public void setEnabled(boolean enabled) { - log.warn("Got request to enable EDQS API, but it isn't supported", new RuntimeException("stacktrace")); - } - @Override public boolean isSupported() { return false; } - @Override - public boolean isAutoEnable() { - return false; - } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java index 514e07c323..707273a11b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java @@ -19,6 +19,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsState; import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg; import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; import org.thingsboard.server.common.data.id.EntityId; @@ -47,4 +48,14 @@ public class DummyEdqsService implements EdqsService { @Override public void processSystemMsg(ToCoreEdqsMsg request) {} + @Override + public boolean isApiEnabled() { + return getState().isApiEnabled(); + } + + @Override + public EdqsState getState() { + return new EdqsState(); + } + } diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index 6e6e975d68..f6b611d5a9 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -71,6 +71,10 @@ queue: max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" # Maximum timeout for requests to EDQS max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" + # Thread pool size for EDQS requests executor + request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}" + # Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process. + versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}" # Strings longer than this threshold will be compressed string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}" stats: diff --git a/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java b/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java index 330d22a3c6..a7f80fdffc 100644 --- a/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java +++ b/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java @@ -60,7 +60,8 @@ import org.thingsboard.server.common.data.query.StringFilterPredicate; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.stats.DummyEdqsStatsService; -import org.thingsboard.server.edqs.util.EdqsConverter; +import org.thingsboard.server.edqs.util.DefaultEdqsMapper; +import org.thingsboard.server.edqs.util.EdqsMapper; import java.util.Collections; import java.util.List; @@ -79,7 +80,7 @@ public abstract class AbstractEDQTest { @Autowired protected DefaultEdqsRepository repository; @Autowired - protected EdqsConverter edqsConverter; + protected EdqsMapper edqsMapper; @MockBean private DummyEdqsStatsService edqsStatsService; @@ -244,12 +245,12 @@ public abstract class AbstractEDQTest { } protected void addOrUpdate(EntityType entityType, Object entity) { - addOrUpdate(EdqsConverter.toEntity(entityType, entity)); + addOrUpdate(DefaultEdqsMapper.toEntity(entityType, entity)); } protected void addOrUpdate(EdqsObject edqsObject) { - byte[] serialized = edqsConverter.serialize(edqsObject.type(), edqsObject); - edqsObject = edqsConverter.deserialize(edqsObject.type(), serialized); + byte[] serialized = edqsMapper.serialize(edqsObject); + edqsObject = edqsMapper.deserialize(edqsObject.type(), serialized, false); repository.get(tenantId).addOrUpdate(edqsObject); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java index aafe93f9f3..102f4b82ea 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.edqs.EdqsState; import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; @@ -738,13 +739,13 @@ public class TestRestClient { .as(Long.class); } - public Boolean isEdqsApiEnabled() { + public EdqsState getEdqsState() { return given().spec(requestSpec) - .get("/api/edqs/enabled") + .get("/api/edqs/state") .then() .statusCode(HTTP_OK) .extract() - .as(Boolean.class); + .as(EdqsState.class); } public void assignDeviceToCustomer(CustomerId customerId, DeviceId id) { diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/edqs/EdqsEntityDataQueryTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/edqs/EdqsEntityDataQueryTest.java index 53d8a72e7f..770d32e397 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/edqs/EdqsEntityDataQueryTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/edqs/EdqsEntityDataQueryTest.java @@ -73,7 +73,7 @@ public class EdqsEntityDataQueryTest extends AbstractContainerTest { @BeforeClass public void beforeClass() throws Exception { testRestClient.login("sysadmin@thingsboard.org", "sysadmin"); - await().atMost(60, TimeUnit.SECONDS).until(() -> testRestClient.isEdqsApiEnabled()); + await().atMost(60, TimeUnit.SECONDS).until(() -> testRestClient.getEdqsState().isApiEnabled()); tenantId = testRestClient.postTenant(EntityPrototypes.defaultTenantPrototype("Tenant")).getId(); tenantAdminId = testRestClient.createUserAndLogin(defaultTenantAdmin(tenantId, "tenantAdmin@thingsboard.org"), "tenant");