Browse Source

Merge pull request #13456 from thingsboard/rc

rc
pull/13461/head
Viacheslav Klimov 1 year ago
committed by GitHub
parent
commit
e054802fe2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 7
      application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java
  2. 2
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java
  3. 26
      application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java
  4. 155
      application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java
  5. 6
      application/src/main/resources/thingsboard.yml
  6. 76
      application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java
  7. 7
      application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java
  8. 9
      application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java
  9. 3
      common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java
  10. 6
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java
  11. 2
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java
  12. 18
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java
  13. 72
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java
  14. 4
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java
  15. 7
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java
  16. 4
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsMsg.java
  17. 6
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsRequest.java
  18. 5
      common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java
  19. 8
      common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java
  20. 68
      common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java
  21. 2
      common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/EntityNameQueryProcessor.java
  22. 3
      common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java
  23. 60
      common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java
  24. 21
      common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java
  25. 83
      common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java
  26. 30
      common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java
  27. 15
      common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java
  28. 6
      common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsApiService.java
  29. 5
      common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsService.java
  30. 2
      common/proto/src/main/proto/queue.proto
  31. 1
      common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java
  32. 5
      common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java
  33. 164
      common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java
  34. 5
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java
  35. 3
      common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java
  36. 4
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java
  37. 16
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java
  38. 35
      common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java
  39. 11
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java
  40. 2
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/DiscoveryService.java
  41. 9
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/DummyDiscoveryService.java
  42. 2
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java
  43. 13
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java
  44. 4
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java
  45. 70
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsExecutors.java
  46. 5
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java
  47. 26
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java
  48. 35
      common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java
  49. 3
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java
  50. 4
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java
  51. 5
      common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java
  52. 19
      common/queue/src/test/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplateTest.java
  53. 10
      dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java
  54. 15
      dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsApiService.java
  55. 11
      dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java
  56. 4
      edqs/src/main/resources/edqs.yml
  57. 11
      edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java
  58. 7
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java
  59. 2
      msa/black-box-tests/src/test/java/org/thingsboard/server/msa/edqs/EdqsEntityDataQueryTest.java

7
application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java

@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.TenantId;
@ -149,9 +150,9 @@ public class EntityQueryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN')")
@GetMapping("/edqs/enabled")
public boolean isEdqsApiEnabled() {
return edqsApiService.isEnabled();
@GetMapping("/edqs/state")
public EdqsState getEdqsState() {
return edqsService.getState();
}
}

2
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java

@ -72,7 +72,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
@Override
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
stateService.update(queueKey, partitions);
stateService.update(queueKey, partitions, null);
}
@Override

26
application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsApiService.java

@ -22,7 +22,6 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
@ -51,11 +50,6 @@ public class DefaultEdqsApiService implements EdqsApiService {
private final EdqsClientQueueFactory queueFactory;
private TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> requestTemplate;
@Value("${queue.edqs.api.auto_enable:true}")
private boolean autoEnable;
private Boolean apiEnabled = null;
@PostConstruct
private void init() {
requestTemplate = queueFactory.createEdqsRequestTemplate();
@ -85,31 +79,11 @@ public class DefaultEdqsApiService implements EdqsApiService {
}, MoreExecutors.directExecutor());
}
@Override
public boolean isEnabled() {
return Boolean.TRUE.equals(apiEnabled);
}
@Override
public void setEnabled(boolean enabled) {
if (enabled) {
log.info("Enabling EDQS API");
} else {
log.info("Disabling EDQS API");
}
apiEnabled = enabled;
}
@Override
public boolean isSupported() {
return true;
}
@Override
public boolean isAutoEnable() {
return autoEnable;
}
@PreDestroy
private void stop() {
requestTemplate.stop();

155
application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java

@ -20,11 +20,13 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -36,6 +38,9 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode;
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus;
import org.thingsboard.server.common.data.edqs.EdqsSyncRequest;
import org.thingsboard.server.common.data.edqs.Entity;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
@ -51,20 +56,25 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.edqs.processor.EdqsProducer;
import org.thingsboard.server.edqs.state.EdqsPartitionService;
import org.thingsboard.server.edqs.util.EdqsConverter;
import org.thingsboard.server.edqs.util.DefaultEdqsMapper;
import org.thingsboard.server.edqs.util.EdqsMapper;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsCoreServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.HashPartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.environment.DistributedLock;
import org.thingsboard.server.queue.environment.DistributedLockService;
import org.thingsboard.server.queue.provider.EdqsClientQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
@ -74,31 +84,42 @@ import java.util.concurrent.TimeUnit;
public class DefaultEdqsService implements EdqsService {
private final EdqsClientQueueFactory queueFactory;
private final EdqsConverter edqsConverter;
private final EdqsMapper edqsMapper;
private final EdqsSyncService edqsSyncService;
private final EdqsApiService edqsApiService;
private final DistributedLockService distributedLockService;
private final AttributesService attributesService;
private final EdqsPartitionService edqsPartitionService;
private final TopicService topicService;
private final TbServiceInfoProvider serviceInfoProvider;
private final DiscoveryService discoveryService;
@Autowired @Lazy
private TbClusterService clusterService;
@Autowired @Lazy
private HashPartitionService hashPartitionService;
@Value("${queue.edqs.api.auto_enable:true}")
private boolean autoEnableApi;
@Value("${queue.edqs.readiness_check_interval:60000}")
private int edqsReadinessCheckInterval;
private EdqsProducer eventsProducer;
private ExecutorService executor;
private ScheduledExecutorService scheduler;
private DistributedLock syncLock;
@Getter
private EdqsState state;
@PostConstruct
private void init() {
executor = ThingsBoardExecutors.newWorkStealingPool(12, getClass());
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-check");
eventsProducer = EdqsProducer.builder()
.producer(queueFactory.createEdqsEventsProducer())
.partitionService(edqsPartitionService)
.build();
syncLock = distributedLockService.getLock("edqs_sync");
state = new EdqsState();
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
@ -106,6 +127,26 @@ public class DefaultEdqsService implements EdqsService {
if (!serviceInfoProvider.isService(ServiceType.TB_CORE)) {
return;
}
if (edqsApiService.isSupported()) {
scheduler.scheduleWithFixedDelay(() -> {
if (!hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) {
return;
}
List<ServiceInfo> servers = new ArrayList<>(discoveryService.getOtherServers());
servers.add(serviceInfoProvider.getServiceInfo());
List<ServiceInfo> readyEdqsServers = servers.stream()
.filter(serviceInfo -> serviceInfo.getServiceTypesList().contains(ServiceType.EDQS.name()))
.filter(ServiceInfo::getReady)
.toList();
boolean changed = state.setEdqsReady(!readyEdqsServers.isEmpty());
if (changed) {
broadcastEdqsReady(state.getEdqsReady());
}
}, 0, edqsReadinessCheckInterval, TimeUnit.MILLISECONDS);
}
executor.submit(() -> {
try {
EdqsSyncState syncState = getSyncState();
@ -115,9 +156,9 @@ public class DefaultEdqsService implements EdqsService {
.syncRequest(new EdqsSyncRequest())
.build());
}
} else if (edqsApiService.isSupported() && edqsApiService.isAutoEnable()) {
} else {
// only if topic/RocksDB is not empty and sync is finished
edqsApiService.setEnabled(true);
onSyncStatusUpdate(EdqsSyncStatus.FINISHED);
}
} catch (Throwable e) {
log.error("Failed to start EDQS service", e);
@ -131,7 +172,10 @@ public class DefaultEdqsService implements EdqsService {
if (request.getSyncRequest() != null) {
saveSyncState(EdqsSyncStatus.REQUESTED);
}
broadcast(request.toInternalMsg());
broadcast(ToCoreEdqsMsg.builder()
.syncRequest(request.getSyncRequest())
.apiEnabled(request.getApiEnabled())
.build());
}
@Override
@ -140,7 +184,18 @@ public class DefaultEdqsService implements EdqsService {
log.info("Processing system msg {}", msg);
try {
if (msg.getApiEnabled() != null) {
edqsApiService.setEnabled(msg.getApiEnabled());
if (msg.getApiEnabled()) {
state.setApiMode(EdqsApiMode.ENABLED);
} else {
state.setApiMode(EdqsApiMode.DISABLED);
}
log.info("New state: {}", state);
}
if (msg.getEdqsReady() != null) {
onEdqsReady(msg.getEdqsReady());
}
if (msg.getSyncStatus() != null) {
onSyncStatusUpdate(msg.getSyncStatus());
}
if (msg.getSyncRequest() != null) {
@ -154,23 +209,16 @@ public class DefaultEdqsService implements EdqsService {
return;
}
}
saveSyncState(EdqsSyncStatus.STARTED);
edqsSyncService.sync();
saveSyncState(EdqsSyncStatus.FINISHED);
if (edqsApiService.isSupported())
if (edqsApiService.isAutoEnable()) {
log.info("EDQS sync is finished, auto-enabling API");
broadcast(ToCoreEdqsMsg.builder()
.apiEnabled(Boolean.TRUE)
.build());
} else {
log.info("EDQS sync is finished, but leaving API disabled");
}
saveSyncState(EdqsSyncStatus.FINISHED);
broadcastSyncStatusUpdate(EdqsSyncStatus.FINISHED);
} catch (Exception e) {
log.error("Failed to complete sync", e);
saveSyncState(EdqsSyncStatus.FAILED);
broadcastSyncStatusUpdate(EdqsSyncStatus.FAILED);
} finally {
syncLock.unlock();
}
@ -181,6 +229,60 @@ public class DefaultEdqsService implements EdqsService {
});
}
private void broadcastEdqsReady(boolean ready) {
broadcast(ToCoreEdqsMsg.builder()
.edqsReady(ready)
.build());
}
private void onEdqsReady(boolean ready) {
state.setEdqsReady(ready);
checkState();
}
private void broadcastSyncStatusUpdate(EdqsSyncStatus status) {
broadcast(ToCoreEdqsMsg.builder()
.syncStatus(status)
.build());
}
private void onSyncStatusUpdate(EdqsSyncStatus status) {
state.setSyncStatus(status);
checkState();
}
private void checkState() {
if (!edqsApiService.isSupported()) {
log.info("New state: {}. EDQS API not supported", state);
return;
}
if (state.isApiReady()) {
if (autoEnableApi) {
if (state.getApiMode() == null || state.getApiMode() == EdqsApiMode.AUTO_DISABLED) {
state.setApiMode(EdqsApiMode.AUTO_ENABLED);
log.info("New state: {}. Auto-enabled EDQS API", state);
} else {
log.info("New state: {}. API mode left as is", state);
}
} else {
log.info("New state: {}. API auto-enabling is disabled", state);
}
} else {
if (state.isApiEnabled()) {
state.setApiMode(EdqsApiMode.AUTO_DISABLED);
log.info("New state: {}. Disabled EDQS API", state);
} else {
log.info("New state: {}. API left disabled", state);
}
}
}
@Override
public boolean isApiEnabled() {
return state.isApiEnabled();
}
@Override
public void onUpdate(TenantId tenantId, EntityId entityId, Object entity) {
EntityType entityType = entityId.getEntityType();
@ -189,7 +291,7 @@ public class DefaultEdqsService implements EdqsService {
log.trace("[{}][{}] Ignoring update event, type {} not supported", tenantId, entityId, entityType);
return;
}
onUpdate(tenantId, objectType, edqsConverter.toEntity(entityType, entity));
onUpdate(tenantId, objectType, DefaultEdqsMapper.toEntity(entityType, entity));
}
@Override
@ -216,12 +318,11 @@ public class DefaultEdqsService implements EdqsService {
protected void processEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType, EdqsObject object) {
executor.submit(() -> {
try {
String key = object.key();
String key = object.stringKey();
Long version = object.version();
EdqsEventMsg.Builder eventMsg = EdqsEventMsg.newBuilder()
.setKey(key)
.setObjectType(objectType.name())
.setData(ByteString.copyFrom(edqsConverter.serialize(objectType, object)))
.setData(ByteString.copyFrom(edqsMapper.serialize(object)))
.setEventType(eventType.name());
if (version != null) {
eventMsg.setVersion(version);
@ -278,6 +379,7 @@ public class DefaultEdqsService implements EdqsService {
@PreDestroy
private void stop() {
executor.shutdown();
scheduler.shutdownNow();
eventsProducer.stop();
}
@ -288,11 +390,4 @@ public class DefaultEdqsService implements EdqsService {
private EdqsSyncStatus status;
}
private enum EdqsSyncStatus {
REQUESTED,
STARTED,
FINISHED,
FAILED
}
}

6
application/src/main/resources/thingsboard.yml

@ -1764,6 +1764,8 @@ queue:
supported: "${TB_EDQS_API_SUPPORTED:false}"
# Whether to auto-enable EDQS API (if queue.edqs.api.supported is true) when sync of data to Kafka is finished
auto_enable: "${TB_EDQS_API_AUTO_ENABLE:true}"
# Interval in milliseconds to check for ready EDQS servers
readiness_check_interval: "${TB_EDQS_READINESS_CHECK_INTERVAL_MS:60000}"
# Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices)
mode: "${TB_EDQS_MODE:local}"
local:
@ -1787,6 +1789,10 @@ queue:
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
# Maximum timeout for requests to EDQS
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
# Thread pool size for EDQS requests executor
request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}"
# Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process.
versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}"
# Strings longer than this threshold will be compressed
string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}"
stats:

76
application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java

@ -15,21 +15,27 @@
*/
package org.thingsboard.server.controller;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.edqs.state.EdqsStateService;
import org.thingsboard.server.edqs.util.EdqsRocksDb;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@DaoSqlTest
@ -39,22 +45,23 @@ import static org.awaitility.Awaitility.await;
"queue.edqs.sync.enabled=true",
"queue.edqs.api.supported=true",
"queue.edqs.api.auto_enable=true",
"queue.edqs.mode=local"
"queue.edqs.mode=local",
"queue.edqs.readiness_check_interval=500"
})
public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
@Autowired
private EdqsApiService edqsApiService;
private EdqsService edqsService;
@Autowired
private EdqsStateService edqsStateService;
private DiscoveryService discoveryService;
@MockBean // so that we don't do backup for tests
private EdqsRocksDb edqsRocksDb;
@Before
public void before() {
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsApiService.isEnabled() && edqsStateService.isReady());
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.getState().isApiEnabled());
}
@Override
@ -69,4 +76,61 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
result -> result == expectedResult);
}
@Test
public void testEdqsState() {
assertThat(edqsService.getState().getApiMode()).isEqualTo(EdqsApiMode.AUTO_ENABLED);
// notifying EDQS is not ready: API should be auto-disabled
discoveryService.setReady(false);
verifyState(state -> {
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED);
assertThat(state.getEdqsReady()).isFalse();
assertThat(state.isApiEnabled()).isFalse();
});
// manually disabling API
edqsService.processSystemRequest(ToCoreEdqsRequest.builder()
.apiEnabled(false)
.build());
verifyState(state -> {
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED);
assertThat(state.getEdqsReady()).isFalse();
assertThat(state.isApiEnabled()).isFalse();
});
// notifying EDQS is ready: API should not be enabled automatically because manually disabled previously
discoveryService.setReady(true);
verifyState(state -> {
assertThat(state.getEdqsReady()).isTrue();
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED);
assertThat(state.isApiEnabled()).isFalse();
});
// manually enabling API
edqsService.processSystemRequest(ToCoreEdqsRequest.builder()
.apiEnabled(true)
.build());
verifyState(state -> {
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.ENABLED);
assertThat(state.getEdqsReady()).isTrue();
assertThat(state.isApiEnabled()).isTrue();
});
// notifying EDQS is not ready: API should be auto-disabled
discoveryService.setReady(false);
verifyState(state -> {
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED);
assertThat(state.getEdqsReady()).isFalse();
assertThat(state.isApiEnabled()).isFalse();
});
discoveryService.setReady(true);
}
private void verifyState(ThrowingConsumer<EdqsState> assertion) {
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(edqsService.getState()).satisfies(assertion);
});
}
}

7
application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java

@ -22,6 +22,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.ResultActions;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
import org.thingsboard.common.util.JacksonUtil;
@ -78,6 +79,10 @@ import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
@TestPropertySource(properties = {
"queue.edqs.sync.enabled=true", // only enabling sync
"queue.edqs.api.supported=false",
})
public class EntityQueryControllerTest extends AbstractControllerTest {
private static final String CUSTOMER_USER_EMAIL = "entityQueryCustomer@thingsboard.org";
@ -803,7 +808,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
//assign dashboard
doPost("/api/customer/" + savedCustomer.getId().getId().toString()
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
// check entity data query by customer
User customerUser = new User();

9
application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java

@ -33,7 +33,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.edqs.util.EdqsRocksDb;
@ -53,19 +53,20 @@ import static org.awaitility.Awaitility.await;
"queue.edqs.sync.enabled=true",
"queue.edqs.api.supported=true",
"queue.edqs.api.auto_enable=true",
"queue.edqs.mode=local"
"queue.edqs.mode=local",
"queue.edqs.readiness_check_interval=1000"
})
public class EdqsEntityServiceTest extends EntityServiceTest {
@Autowired
private EdqsApiService edqsApiService;
private EdqsService edqsService;
@MockBean
private EdqsRocksDb edqsRocksDb;
@Before
public void beforeEach() {
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsApiService.isEnabled());
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled());
}
// sql implementation has a bug with data duplication, edqs implementation returns correct value

3
common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueProducer.java

@ -19,11 +19,10 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
public interface TbQueueProducer<T extends TbQueueMsg> {
void init();
String getDefaultTopic();
void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback);
void stop();
}

6
common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java

@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import java.util.UUID;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ -58,7 +60,7 @@ public class AttributeKv implements EdqsObject {
}
@Override
public String key() {
public String stringKey() {
return "a_" + entityId + "_" + scope + "_" + key;
}
@ -72,4 +74,6 @@ public class AttributeKv implements EdqsObject {
return ObjectType.ATTRIBUTE_KV;
}
public record Key(UUID entityId, AttributeScope scope, int key) implements EdqsObjectKey {}
}

2
common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java

@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.ObjectType;
public interface EdqsObject {
@JsonIgnore
String key();
String stringKey();
@JsonIgnore
Long version();

18
common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java

@ -0,0 +1,18 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.edqs;
public interface EdqsObjectKey {}

72
common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java

@ -0,0 +1,72 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.edqs;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.BooleanUtils;
@Getter
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class EdqsState {
private Boolean edqsReady;
@Setter
private EdqsSyncStatus syncStatus;
@Setter
private EdqsApiMode apiMode;
public boolean setEdqsReady(boolean ready) {
boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.edqsReady, false) != ready;
this.edqsReady = ready;
return changed;
}
public boolean isApiReady() {
return edqsReady && syncStatus == EdqsSyncStatus.FINISHED;
}
public boolean isApiEnabled() {
return apiMode != null && (apiMode == EdqsApiMode.ENABLED || apiMode == EdqsApiMode.AUTO_ENABLED);
}
@Override
public String toString() {
return '[' +
"EDQS ready: " + edqsReady +
", sync status: " + syncStatus +
", API mode: " + apiMode +
']';
}
public enum EdqsSyncStatus {
REQUESTED,
STARTED,
FINISHED,
FAILED
}
public enum EdqsApiMode {
ENABLED,
AUTO_ENABLED,
DISABLED,
AUTO_DISABLED
}
}

4
common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java

@ -51,7 +51,7 @@ public class Entity implements EdqsObject {
}
@Override
public String key() {
public String stringKey() {
return "e_" + fields.getId().toString();
}
@ -65,4 +65,6 @@ public class Entity implements EdqsObject {
return ObjectType.fromEntityType(type);
}
public record Key(UUID id) implements EdqsObjectKey {}
}

7
common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java

@ -24,6 +24,8 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.UUID;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ -53,7 +55,8 @@ public class LatestTsKv implements EdqsObject {
this.version = version != null ? version : 0L;
}
public String key() {
@Override
public String stringKey() {
return "l_" + entityId + "_" + key;
}
@ -67,4 +70,6 @@ public class LatestTsKv implements EdqsObject {
return ObjectType.LATEST_TS_KV;
}
public record Key(UUID entityId, int key) implements EdqsObjectKey {}
}

4
common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsMsg.java

@ -19,6 +19,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus;
@Data
@AllArgsConstructor
@ -29,4 +30,7 @@ public class ToCoreEdqsMsg {
private EdqsSyncRequest syncRequest;
private Boolean apiEnabled;
private EdqsSyncStatus syncStatus;
private Boolean edqsReady;
}

6
common/data/src/main/java/org/thingsboard/server/common/data/edqs/ToCoreEdqsRequest.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.common.data.edqs;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -30,9 +29,4 @@ public class ToCoreEdqsRequest {
private EdqsSyncRequest syncRequest;
private Boolean apiEnabled;
@JsonIgnore
public ToCoreEdqsMsg toInternalMsg() {
return new ToCoreEdqsMsg(syncRequest, apiEnabled);
}
}

5
common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java

@ -29,9 +29,7 @@ public interface EntityFields {
Logger log = LoggerFactory.getLogger(EntityFields.class);
default UUID getId() {
return null;
}
UUID getId();
default UUID getTenantId() {
return null;
@ -147,6 +145,7 @@ public interface EntityFields {
default String getAsString(String key) {
return switch (key) {
case "id" -> getId().toString();
case "createdTime" -> Long.toString(getCreatedTime());
case "title" -> getName();
case "type" -> getType();

8
common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java

@ -27,10 +27,12 @@ import org.thingsboard.server.common.data.BaseDataWithAdditionalInfo;
import org.thingsboard.server.common.data.HasVersion;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.validation.Length;
import java.io.Serializable;
import java.util.UUID;
@Slf4j
@Schema
@ -119,8 +121,8 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject {
BaseDataWithAdditionalInfo.setJson(addInfo, json -> this.additionalInfo = json, bytes -> this.additionalInfoBytes = bytes);
}
@JsonIgnore
public String key() {
@Override
public String stringKey() {
return "r_" + from + "_" + to + "_" + typeGroup + "_" + type;
}
@ -134,4 +136,6 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject {
return ObjectType.RELATION;
}
public record Key(UUID from, UUID to, RelationTypeGroup typeGroup, String type) implements EdqsObjectKey {}
}

68
common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java

@ -18,7 +18,6 @@ package org.thingsboard.server.edqs.processor;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
@ -29,7 +28,6 @@ import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ExceptionUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEvent;
@ -47,31 +45,31 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.edqs.repo.EdqsRepository;
import org.thingsboard.server.edqs.state.EdqsPartitionService;
import org.thingsboard.server.edqs.state.EdqsStateService;
import org.thingsboard.server.edqs.util.EdqsConverter;
import org.thingsboard.server.edqs.util.EdqsMapper;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsConfig.EdqsPartitioningStrategy;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -85,24 +83,20 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> {
private final EdqsQueueFactory queueFactory;
private final EdqsConverter converter;
private final EdqsMapper mapper;
private final EdqsRepository repository;
private final EdqsConfig config;
private final EdqsExecutors edqsExecutors;
private final EdqsPartitionService partitionService;
private final DiscoveryService discoveryService;
private final TopicService topicService;
private final ConfigurableApplicationContext applicationContext;
private final EdqsStateService stateService;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
private ExecutorService consumersExecutor;
private ExecutorService taskExecutor;
private ScheduledExecutorService scheduler;
private PartitionedQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
private ListeningExecutorService requestExecutor;
private final VersionsStore versionsStore = new VersionsStore();
private VersionsStore versionsStore;
private final AtomicInteger counter = new AtomicInteger();
@Getter
@ -110,18 +104,17 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
@PostConstruct
private void init() {
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer"));
taskExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-task-executor");
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler");
requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(12, "edqs-requests"));
errorHandler = error -> {
if (error instanceof OutOfMemoryError) {
log.error("OOM detected, shutting down");
repository.clear();
discoveryService.setReady(false);
Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edqs-shutdown"))
.execute(applicationContext::close);
}
};
requestExecutor = edqsExecutors.getRequestExecutor();
versionsStore = new VersionsStore(config.getVersionsCacheTtl());
eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic()))
@ -143,19 +136,14 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
})
.consumerCreator((config, tpi) -> queueFactory.createEdqsEventsConsumer())
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(consumersExecutor)
.taskExecutor(taskExecutor)
.scheduler(scheduler)
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.taskExecutor(edqsExecutors.getConsumerTaskExecutor())
.scheduler(edqsExecutors.getScheduler())
.uncaughtErrorHandler(errorHandler)
.build();
stateService.init(eventConsumer);
responseTemplate = queueFactory.createEdqsResponseTemplate(this);
responseTemplate = queueFactory.createEdqsResponseTemplate();
}
@AfterStartUp(order = 1)
public void start() {
responseTemplate.launch(this);
stateService.init(eventConsumer, List.of(responseTemplate.getRequestConsumer()));
}
@EventListener
@ -165,10 +153,8 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
}
try {
Set<TopicPartitionInfo> newPartitions = event.getNewPartitions().get(new QueueKey(ServiceType.EDQS));
stateService.process(withTopic(newPartitions, topicService.buildTopicName(config.getStateTopic())));
// eventsConsumer's partitions are updated by stateService
responseTemplate.subscribe(withTopic(newPartitions, topicService.buildTopicName(config.getRequestsTopic()))); // TODO: we subscribe to partitions before we are ready. implement consumer-per-partition version for request template
// partitions for event and request consumers are updated by stateService
Set<TopicPartitionInfo> oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
if (CollectionsUtil.isNotEmpty(oldPartitions)) {
@ -243,22 +229,20 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
TenantId tenantId = getTenantId(edqsMsg);
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType());
String key = eventMsg.getKey();
Long version = eventMsg.hasVersion() ? eventMsg.getVersion() : null;
EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), false);
if (version != null) {
if (!versionsStore.isNew(key, version)) {
if (!versionsStore.isNew(mapper.getKey(object), version)) {
return;
}
} else if (!ObjectType.unversionedTypes.contains(objectType)) {
log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key);
log.warn("[{}] {} doesn't have version: {}", tenantId, objectType, object);
}
if (backup) {
stateService.save(tenantId, objectType, key, eventType, edqsMsg);
stateService.save(tenantId, objectType, object.stringKey(), eventType, edqsMsg);
}
EdqsObject object = converter.deserialize(objectType, eventMsg.getData().toByteArray());
log.debug("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version);
int count = counter.incrementAndGet();
if (count % 100000 == 0) {
log.info("Processed {} events", count);
@ -270,6 +254,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
.eventType(eventType)
.object(object)
.build();
log.debug("Processing event: {}", event);
repository.processEvent(event);
}
}
@ -292,11 +277,6 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
eventConsumer.awaitStop();
responseTemplate.stop();
stateService.stop();
consumersExecutor.shutdownNow();
taskExecutor.shutdownNow();
scheduler.shutdownNow();
requestExecutor.shutdownNow();
}
}

2
common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/EntityNameQueryProcessor.java

@ -35,7 +35,7 @@ public class EntityNameQueryProcessor extends AbstractSimpleQueryProcessor<Entit
@Override
protected boolean matches(EntityData ed) {
return ed.getFields() != null && (pattern == null || pattern.matcher(ed.getFields().getName()).matches());
return super.matches(ed) && (pattern == null || pattern.matcher(ed.getFields().getName()).matches());
}
}

3
common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java

@ -23,11 +23,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import java.util.List;
import java.util.Set;
public interface EdqsStateService {
void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer);
void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers);
void process(Set<TopicPartitionInfo> partitions);

60
common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java

@ -22,11 +22,13 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.edqs.processor.EdqsProcessor;
import org.thingsboard.server.edqs.processor.EdqsProducer;
import org.thingsboard.server.edqs.util.EdqsMapper;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
@ -34,16 +36,18 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.common.state.QueueStateService;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -60,23 +64,28 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsConfig config;
private final EdqsPartitionService partitionService;
private final KafkaEdqsQueueFactory queueFactory;
private final DiscoveryService discoveryService;
private final EdqsExecutors edqsExecutors;
private final EdqsMapper mapper;
private final TopicService topicService;
@Autowired
@Lazy
private EdqsProcessor edqsProcessor;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> stateConsumer;
private QueueStateService<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>> queueStateService;
private KafkaQueueStateService<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>> queueStateService;
private QueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventsToBackupConsumer;
private EdqsProducer stateProducer;
private VersionsStore versionsStore;
private final VersionsStore versionsStore = new VersionsStore();
private final AtomicInteger stateReadCount = new AtomicInteger();
private final AtomicInteger eventsReadCount = new AtomicInteger();
private Boolean ready;
private boolean ready = false;
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
versionsStore = new VersionsStore(config.getVersionsCacheTtl());
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
@ -98,9 +107,9 @@ public class KafkaEdqsStateService implements EdqsStateService {
})
.consumerCreator((config, tpi) -> queueFactory.createEdqsStateConsumer())
.queueAdmin(queueAdmin)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.taskExecutor(eventConsumer.getTaskExecutor())
.scheduler(eventConsumer.getScheduler())
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.taskExecutor(edqsExecutors.getConsumerTaskExecutor())
.scheduler(edqsExecutors.getScheduler())
.uncaughtErrorHandler(edqsProcessor.getErrorHandler())
.build();
@ -119,22 +128,25 @@ public class KafkaEdqsStateService implements EdqsStateService {
if (msg.hasEventMsg()) {
EdqsEventMsg eventMsg = msg.getEventMsg();
String key = eventMsg.getKey();
int count = eventsReadCount.incrementAndGet();
if (count % 100000 == 0) {
log.info("[events-to-backup] Processed {} msgs", count);
}
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), true);
if (eventMsg.hasVersion()) {
if (!versionsStore.isNew(key, eventMsg.getVersion())) {
if (!versionsStore.isNew(mapper.getKey(object), eventMsg.getVersion())) {
continue;
}
}
TenantId tenantId = getTenantId(msg);
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType());
String key = object.stringKey();
log.trace("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key);
stateProducer.send(tenantId, objectType, key, msg);
stateProducer.send(tenantId, objectType, object.stringKey(), msg);
int count = eventsReadCount.incrementAndGet();
if (count % 100000 == 0) {
log.info("[events-to-backup] Processed {} msgs", count);
}
}
} catch (Throwable t) {
log.error("Failed to process message: {}", queueMsg, t);
@ -143,7 +155,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
consumer.commit();
})
.consumerCreator(() -> eventsToBackupKafkaConsumer)
.consumerExecutor(eventConsumer.getConsumerExecutor())
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.threadPrefix("edqs-events-to-backup")
.build();
@ -155,6 +167,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
queueStateService = KafkaQueueStateService.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>>builder()
.eventConsumer(eventConsumer)
.stateConsumer(stateConsumer)
.otherConsumers(otherConsumers)
.eventsStartOffsetsProvider(() -> {
// taking start offsets for events topics from the events-to-backup consumer group,
// since eventConsumer doesn't use consumer group management and thus offset tracking
@ -185,7 +198,10 @@ public class KafkaEdqsStateService implements EdqsStateService {
eventsToBackupConsumer.subscribe(allPartitions);
eventsToBackupConsumer.launch();
}
queueStateService.update(new QueueKey(ServiceType.EDQS), partitions);
queueStateService.update(new QueueKey(ServiceType.EDQS), partitions, () -> {
ready = true;
discoveryService.setReady(true);
});
}
@Override
@ -195,13 +211,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public boolean isReady() {
if (ready == null) {
Set<TopicPartitionInfo> partitionsInProgress = queueStateService.getPartitionsInProgress();
if (partitionsInProgress != null && partitionsInProgress.isEmpty()) {
ready = true; // once true - always true, not to change readiness status on each repartitioning
}
}
return ready != null && ready;
return ready;
}
private TenantId getTenantId(ToEdqsMsg edqsMsg) {

21
common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java

@ -29,8 +29,10 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
import java.util.List;
import java.util.Set;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@ -42,20 +44,24 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
public class LocalEdqsStateService implements EdqsStateService {
private final EdqsRocksDb db;
private final DiscoveryService discoveryService;
@Autowired @Lazy
private EdqsProcessor processor;
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
private Set<TopicPartitionInfo> partitions;
private List<PartitionedQueueConsumerManager<?>> otherConsumers;
private boolean ready = false;
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
this.eventConsumer = eventConsumer;
this.otherConsumers = otherConsumers;
}
@Override
public void process(Set<TopicPartitionInfo> partitions) {
if (this.partitions == null) {
if (!ready) {
db.forEach((key, value) -> {
try {
ToEdqsMsg edqsMsg = ToEdqsMsg.parseFrom(value);
@ -67,8 +73,13 @@ public class LocalEdqsStateService implements EdqsStateService {
});
log.info("Restore completed");
}
ready = true;
discoveryService.setReady(true);
eventConsumer.update(withTopic(partitions, eventConsumer.getTopic()));
this.partitions = partitions;
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.update(withTopic(partitions, consumer.getTopic()));
}
}
@Override
@ -87,7 +98,7 @@ public class LocalEdqsStateService implements EdqsStateService {
@Override
public boolean isReady() {
return partitions != null;
return ready;
}
@Override

83
common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java → common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java

@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
@ -36,6 +35,7 @@ import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.edqs.DataPoint;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
import org.thingsboard.server.common.data.edqs.Entity;
import org.thingsboard.server.common.data.edqs.LatestTsKv;
import org.thingsboard.server.common.data.edqs.fields.FieldsUtil;
@ -52,6 +52,7 @@ import org.thingsboard.server.edqs.data.dp.DoubleDataPoint;
import org.thingsboard.server.edqs.data.dp.JsonDataPoint;
import org.thingsboard.server.edqs.data.dp.LongDataPoint;
import org.thingsboard.server.edqs.data.dp.StringDataPoint;
import org.thingsboard.server.edqs.repo.KeyDictionary;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto;
import org.xerial.snappy.Snappy;
@ -65,19 +66,29 @@ import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class EdqsConverter {
public class DefaultEdqsMapper implements EdqsMapper {
private final EdqsStatsService edqsStatsService;
@Value("${queue.edqs.string_compression_length_threshold:512}")
private int stringCompressionLengthThreshold;
private final Map<ObjectType, Converter<? extends EdqsObject>> converters = new HashMap<>();
private final Converter<Entity> defaultConverter = new JsonConverter<>(Entity.class);
private final Map<ObjectType, Mapper<? extends EdqsObject>> mappers = new HashMap<>();
private final Mapper<Entity> defaultMapper = new JsonMapper<>(Entity.class) {
@Override
public EdqsObjectKey getKey(Entity entity) {
return new Entity.Key(entity.getFields().getId());
}
};
{
converters.put(ObjectType.RELATION, new JsonConverter<>(EntityRelation.class));
converters.put(ObjectType.ATTRIBUTE_KV, new Converter<AttributeKv>() {
mappers.put(ObjectType.RELATION, new JsonMapper<>(EntityRelation.class) {
@Override
public EdqsObjectKey getKey(EntityRelation relation) {
return new EntityRelation.Key(relation.getFrom().getId(), relation.getTo().getId(), relation.getTypeGroup(), relation.getType());
}
});
mappers.put(ObjectType.ATTRIBUTE_KV, new Mapper<AttributeKv>() {
@Override
public byte[] serialize(ObjectType type, AttributeKv attributeKv) {
var proto = TransportProtos.AttributeKvProto.newBuilder()
@ -94,12 +105,12 @@ public class EdqsConverter {
}
@Override
public AttributeKv deserialize(ObjectType type, byte[] bytes) throws Exception {
public AttributeKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception {
TransportProtos.AttributeKvProto proto = TransportProtos.AttributeKvProto.parseFrom(bytes);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()),
new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
AttributeScope scope = AttributeScope.values()[proto.getScope().getNumber()];
DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null;
DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint());
return AttributeKv.builder()
.entityId(entityId)
.scope(scope)
@ -108,8 +119,13 @@ public class EdqsConverter {
.dataPoint(dataPoint)
.build();
}
@Override
public EdqsObjectKey getKey(AttributeKv attributeKv) {
return new AttributeKv.Key(attributeKv.getEntityId().getId(), attributeKv.getScope(), KeyDictionary.get(attributeKv.getKey()));
}
});
converters.put(ObjectType.LATEST_TS_KV, new Converter<LatestTsKv>() {
mappers.put(ObjectType.LATEST_TS_KV, new Mapper<LatestTsKv>() {
@Override
public byte[] serialize(ObjectType type, LatestTsKv latestTsKv) {
var proto = TransportProtos.LatestTsKvProto.newBuilder()
@ -125,11 +141,11 @@ public class EdqsConverter {
}
@Override
public LatestTsKv deserialize(ObjectType type, byte[] bytes) throws Exception {
public LatestTsKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception {
TransportProtos.LatestTsKvProto proto = TransportProtos.LatestTsKvProto.parseFrom(bytes);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()),
new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null;
DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint());
return LatestTsKv.builder()
.entityId(entityId)
.key(proto.getKey())
@ -137,6 +153,11 @@ public class EdqsConverter {
.dataPoint(dataPoint)
.build();
}
@Override
public EdqsObjectKey getKey(LatestTsKv latestTsKv) {
return new LatestTsKv.Key(latestTsKv.getEntityId().getId(), KeyDictionary.get(latestTsKv.getKey()));
}
});
}
@ -223,30 +244,30 @@ public class EdqsConverter {
@SuppressWarnings("unchecked")
@SneakyThrows
public <T extends EdqsObject> byte[] serialize(ObjectType type, T value) {
Converter<T> converter = (Converter<T>) converters.get(type);
if (converter != null) {
return converter.serialize(type, value);
} else {
return defaultConverter.serialize(type, (Entity) value);
}
public <T extends EdqsObject> byte[] serialize(T value) {
ObjectType type = value.type();
Mapper<T> mapper = (Mapper<T>) mappers.getOrDefault(type, defaultMapper);
return mapper.serialize(type, value);
}
@SneakyThrows
public EdqsObject deserialize(ObjectType type, byte[] bytes) {
Converter<? extends EdqsObject> converter = converters.get(type);
if (converter != null) {
return converter.deserialize(type, bytes);
} else {
return defaultConverter.deserialize(type, bytes);
}
public EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey) {
Mapper<? extends EdqsObject> mapper = mappers.getOrDefault(type, defaultMapper);
return mapper.deserialize(type, bytes, onlyKey);
}
@SuppressWarnings("unchecked")
@SneakyThrows
public <T extends EdqsObject> EdqsObjectKey getKey(T object) {
Mapper<T> mapper = (Mapper<T>) mappers.getOrDefault(object.type(), defaultMapper);
return mapper.getKey(object);
}
@RequiredArgsConstructor
private static class JsonConverter<T> implements Converter<T> {
private static abstract class JsonMapper<T> implements Mapper<T> {
private static final SimpleModule module = new SimpleModule();
private static final ObjectMapper mapper = JsonMapper.builder()
private static final ObjectMapper mapper = com.fasterxml.jackson.databind.json.JsonMapper.builder()
.visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.visibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
.visibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE)
@ -267,17 +288,19 @@ public class EdqsConverter {
@SneakyThrows
@Override
public T deserialize(ObjectType objectType, byte[] bytes) {
public T deserialize(ObjectType objectType, byte[] bytes, boolean onlyKey) {
return mapper.readValue(bytes, this.type);
}
}
private interface Converter<T> {
private interface Mapper<T> {
byte[] serialize(ObjectType type, T value) throws Exception;
T deserialize(ObjectType type, byte[] bytes) throws Exception;
T deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception;
EdqsObjectKey getKey(T object);
}

30
common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edqs.util;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
public interface EdqsMapper {
<T extends EdqsObject> byte[] serialize(T value);
EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey);
<T extends EdqsObject> EdqsObjectKey getKey(T object);
}

15
common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java

@ -18,6 +18,7 @@ package org.thingsboard.server.edqs.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -25,18 +26,22 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class VersionsStore {
private final Cache<String, Long> versions = Caffeine.newBuilder()
.expireAfterWrite(24, TimeUnit.HOURS)
.build();
private final Cache<EdqsObjectKey, Long> versions;
public boolean isNew(String key, Long version) {
public VersionsStore(int ttlMinutes) {
this.versions = Caffeine.newBuilder()
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
.build();
}
public boolean isNew(EdqsObjectKey key, Long version) {
AtomicBoolean isNew = new AtomicBoolean(false);
versions.asMap().compute(key, (k, prevVersion) -> {
if (prevVersion == null || prevVersion <= version) {
isNew.set(true);
return version;
} else {
log.info("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
log.debug("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
return prevVersion;
}
});

6
common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsApiService.java

@ -25,12 +25,6 @@ public interface EdqsApiService {
ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request);
boolean isEnabled();
void setEnabled(boolean enabled);
boolean isSupported();
boolean isAutoEnable();
}

5
common/message/src/main/java/org/thingsboard/server/common/msg/edqs/EdqsService.java

@ -17,6 +17,7 @@ package org.thingsboard.server.common.msg.edqs;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.id.EntityId;
@ -36,4 +37,8 @@ public interface EdqsService {
void processSystemMsg(ToCoreEdqsMsg request);
boolean isApiEnabled();
EdqsState getState();
}

2
common/proto/src/main/proto/queue.proto

@ -89,6 +89,7 @@ message ServiceInfo {
SystemInfoProto systemInfo = 10;
repeated string assignedTenantProfiles = 11;
string label = 12;
bool ready = 13;
}
message SystemInfoProto {
@ -1836,7 +1837,6 @@ message FromEdqsMsg {
}
message EdqsEventMsg {
string key = 1;
string objectType = 2;
bytes data = 3;
string eventType = 4;

1
common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplate.java

@ -91,7 +91,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
@Override
public void init() {
queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
requestTemplate.init();
responseTemplate.subscribe();
executor.submit(this::mainLoop);
}

5
common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java

@ -30,8 +30,6 @@ import org.thingsboard.server.queue.TbQueueResponseTemplate;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -44,7 +42,6 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
private final TbQueueConsumer<Request> requestTemplate;
private final TbQueueProducer<Response> responseTemplate;
private final ConcurrentMap<UUID, String> pendingRequests;
private final ExecutorService loopExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final ExecutorService callbackExecutor;
@ -67,7 +64,6 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
MessagesStats stats) {
this.requestTemplate = requestTemplate;
this.responseTemplate = responseTemplate;
this.pendingRequests = new ConcurrentHashMap<>();
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
this.requestTimeout = requestTimeout;
@ -89,7 +85,6 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
@Override
public void launch(TbQueueHandler<Request, Response> handler) {
this.responseTemplate.init();
loopExecutor.submit(() -> {
while (!stopped) {
try {

164
common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java

@ -0,0 +1,164 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@Slf4j
public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate {
@Getter
private final PartitionedQueueConsumerManager<Request> requestConsumer;
private final TbQueueProducer<Response> responseProducer;
private final TbQueueHandler<Request, Response> handler;
private final long pollInterval;
private final int maxPendingRequests;
private final long requestTimeout;
private final MessagesStats stats;
private final ScheduledExecutorService scheduler;
private final ExecutorService callbackExecutor;
private final AtomicInteger pendingRequestCount = new AtomicInteger();
@Builder
public PartitionedQueueResponseTemplate(String key,
TbQueueHandler<Request, Response> handler,
String requestsTopic,
Function<TopicPartitionInfo, TbQueueConsumer<Request>> consumerCreator,
TbQueueProducer<Response> responseProducer,
long pollInterval,
long requestTimeout,
int maxPendingRequests,
ExecutorService consumerExecutor,
ExecutorService callbackExecutor,
ExecutorService consumerTaskExecutor,
MessagesStats stats) {
this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(key + "-queue-response-template-scheduler");
this.callbackExecutor = callbackExecutor;
this.handler = handler;
this.requestConsumer = PartitionedQueueConsumerManager.<Request>create()
.queueKey(key + "-requests")
.topic(requestsTopic)
.pollInterval(pollInterval)
.msgPackProcessor((requests, consumer, config) -> processRequests(requests, consumer))
.consumerCreator((config, tpi) -> consumerCreator.apply(tpi))
.consumerExecutor(consumerExecutor)
.scheduler(scheduler)
.taskExecutor(consumerTaskExecutor)
.build();
this.responseProducer = responseProducer;
this.pollInterval = pollInterval;
this.maxPendingRequests = maxPendingRequests;
this.requestTimeout = requestTimeout;
this.stats = stats;
}
private void processRequests(List<Request> requests, TbQueueConsumer<Request> consumer) {
while (pendingRequestCount.get() >= maxPendingRequests) {
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e) {
log.trace("Failed to wait until the server has capacity to handle new requests", e);
}
}
requests.forEach(request -> {
long currentTime = System.currentTimeMillis();
long expireTs = bytesToLong(request.getHeaders().get(EXPIRE_TS_HEADER));
if (expireTs >= currentTime) {
byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header", request);
return;
}
byte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);
if (responseTopicHeader == null) {
log.error("[{}] Missing response topic in header", request);
return;
}
UUID requestId = bytesToUuid(requestIdHeader);
String responseTopic = bytesToString(responseTopicHeader);
try {
pendingRequestCount.getAndIncrement();
stats.incrementTotal();
AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
response -> {
pendingRequestCount.decrementAndGet();
response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
responseProducer.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
stats.incrementSuccessful();
},
e -> {
pendingRequestCount.decrementAndGet();
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
log.warn("[{}] Timeout to process the request: {}", requestId, request, e);
} else {
log.trace("[{}] Failed to process the request: {}", requestId, request, e);
}
stats.incrementFailed();
},
requestTimeout,
scheduler,
callbackExecutor);
} catch (Throwable e) {
pendingRequestCount.decrementAndGet();
log.warn("[{}] Failed to process the request: {}", requestId, request, e);
stats.incrementFailed();
}
}
});
consumer.commit();
}
public void subscribe(Set<TopicPartitionInfo> partitions) {
requestConsumer.update(partitions);
}
public void stop() {
if (requestConsumer != null) {
requestConsumer.stop();
requestConsumer.awaitStop();
}
if (responseProducer != null) {
responseProducer.stop();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
}

5
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java

@ -25,7 +25,6 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdateConfigTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.UpdatePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.Collection;
@ -50,7 +49,7 @@ import java.util.function.Function;
public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
@Getter
protected final QueueKey queueKey;
protected final Object queueKey;
@Getter
protected C config;
protected final MsgPackProcessor<M, C> msgPackProcessor;
@ -72,7 +71,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
protected volatile boolean stopped;
@Builder
public MainQueueConsumerManager(QueueKey queueKey, C config,
public MainQueueConsumerManager(Object queueKey, C config,
MsgPackProcessor<M, C> msgPackProcessor,
BiFunction<C, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor,

3
common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java

@ -26,7 +26,6 @@ import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.AddPartitionsTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.DeletePartitionsTask;
import org.thingsboard.server.queue.common.consumer.TbQueueConsumerManagerTask.RemovePartitionsTask;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -44,7 +43,7 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
private final String topic;
@Builder(builderMethodName = "create") // not to conflict with super.builder()
public PartitionedQueueConsumerManager(QueueKey queueKey, String topic, long pollInterval, MsgPackProcessor<M, QueueConfig> msgPackProcessor,
public PartitionedQueueConsumerManager(Object queueKey, String topic, long pollInterval, MsgPackProcessor<M, QueueConfig> msgPackProcessor,
BiFunction<QueueConfig, TopicPartitionInfo, TbQueueConsumer<M>> consumerCreator, TbQueueAdmin queueAdmin,
ExecutorService consumerExecutor, ScheduledExecutorService scheduler,
ExecutorService taskExecutor, Consumer<Throwable> uncaughtErrorHandler) {

4
common/queue/src/main/java/org/thingsboard/server/queue/common/state/DefaultQueueStateService.java

@ -18,10 +18,12 @@ package org.thingsboard.server.queue.common.state;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import java.util.Collections;
public class DefaultQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> extends QueueStateService<E, S> {
public DefaultQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer) {
super(eventConsumer);
super(eventConsumer, Collections.emptyList());
}
}

16
common/queue/src/main/java/org/thingsboard/server/queue/common/state/KafkaQueueStateService.java

@ -22,8 +22,11 @@ import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic;
@ -34,17 +37,20 @@ public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
private final PartitionedQueueConsumerManager<S> stateConsumer;
private final Supplier<Map<String, Long>> eventsStartOffsetsProvider;
private final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet();
@Builder
public KafkaQueueStateService(PartitionedQueueConsumerManager<E> eventConsumer,
PartitionedQueueConsumerManager<S> stateConsumer,
List<PartitionedQueueConsumerManager<?>> otherConsumers,
Supplier<Map<String, Long>> eventsStartOffsetsProvider) {
super(eventConsumer);
super(eventConsumer, otherConsumers != null ? otherConsumers : Collections.emptyList());
this.stateConsumer = stateConsumer;
this.eventsStartOffsetsProvider = eventsStartOffsetsProvider;
}
@Override
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, Runnable whenAllProcessed) {
Map<String, Long> eventsStartOffsets = eventsStartOffsetsProvider != null ? eventsStartOffsetsProvider.get() : null; // remembering the offsets before subscribing to states
Set<TopicPartitionInfo> statePartitions = withTopic(partitions, stateConsumer.getTopic());
@ -57,11 +63,17 @@ public class KafkaQueueStateService<E extends TbQueueMsg, S extends TbQueueMsg>
log.info("Finished partition {} (still in progress: {})", statePartition, partitionsInProgress);
if (partitionsInProgress.isEmpty()) {
log.info("All partitions processed");
if (whenAllProcessed != null) {
whenAllProcessed.run();
}
}
TopicPartitionInfo eventPartition = statePartition.withTopic(eventConsumer.getTopic());
if (this.partitions.get(queueKey).contains(eventPartition)) {
eventConsumer.addPartitions(Set.of(eventPartition), null, eventsStartOffsets != null ? eventsStartOffsets::get : null);
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.addPartitions(Set.of(statePartition.withTopic(consumer.getTopic())));
}
}
} finally {
readLock.unlock();

35
common/queue/src/main/java/org/thingsboard/server/queue/common/state/QueueStateService.java

@ -25,9 +25,9 @@ import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -37,19 +37,19 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
protected final PartitionedQueueConsumerManager<E> eventConsumer;
protected final List<PartitionedQueueConsumerManager<?>> otherConsumers;
@Getter
protected final Map<QueueKey, Set<TopicPartitionInfo>> partitions = new HashMap<>();
protected final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet();
protected boolean initialized;
protected final ReadWriteLock partitionsLock = new ReentrantReadWriteLock();
protected QueueStateService(PartitionedQueueConsumerManager<E> eventConsumer) {
protected QueueStateService(PartitionedQueueConsumerManager<E> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
this.eventConsumer = eventConsumer;
this.otherConsumers = otherConsumers;
}
public void update(QueueKey queueKey, Set<TopicPartitionInfo> newPartitions) {
public void update(QueueKey queueKey, Set<TopicPartitionInfo> newPartitions, Runnable whenAllProcessed) {
newPartitions = withTopic(newPartitions, eventConsumer.getTopic());
var writeLock = partitionsLock.writeLock();
writeLock.lock();
@ -71,17 +71,29 @@ public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueM
}
if (!addedPartitions.isEmpty()) {
addPartitions(queueKey, addedPartitions);
addPartitions(queueKey, addedPartitions, whenAllProcessed);
} else {
if (whenAllProcessed != null) {
whenAllProcessed.run();
}
}
initialized = true;
}
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
protected void addPartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions, Runnable whenAllProcessed) {
if (whenAllProcessed != null) {
whenAllProcessed.run();
}
eventConsumer.addPartitions(partitions);
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.addPartitions(withTopic(partitions, consumer.getTopic()));
}
}
protected void removePartitions(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
eventConsumer.removePartitions(partitions);
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.removePartitions(withTopic(partitions, consumer.getTopic()));
}
}
public void delete(Set<TopicPartitionInfo> partitions) {
@ -100,10 +112,9 @@ public abstract class QueueStateService<E extends TbQueueMsg, S extends TbQueueM
protected void deletePartitions(Set<TopicPartitionInfo> partitions) {
eventConsumer.delete(withTopic(partitions, eventConsumer.getTopic()));
}
public Set<TopicPartitionInfo> getPartitionsInProgress() {
return initialized ? partitionsInProgress : null;
for (PartitionedQueueConsumerManager<?> consumer : otherConsumers) {
consumer.removePartitions(withTopic(partitions, consumer.getTopic()));
}
}
public void stop() {

11
common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java

@ -68,6 +68,8 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
private List<ServiceType> serviceTypes;
private ServiceInfo serviceInfo;
private boolean ready = true;
@PostConstruct
public void init() {
if (StringUtils.isEmpty(serviceId)) {
@ -87,6 +89,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
assignedTenantProfiles = Collections.emptySet();
}
if (serviceTypes.contains(ServiceType.EDQS)) {
ready = false;
if (StringUtils.isBlank(edqsConfig.getLabel())) {
edqsConfig.setLabel(serviceId);
}
@ -128,9 +131,17 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList()));
}
builder.setLabel(edqsConfig.getLabel());
builder.setReady(ready);
return serviceInfo = builder.build();
}
@Override
public boolean setReady(boolean ready) {
boolean changed = this.ready != ready;
this.ready = ready;
return changed;
}
private TransportProtos.SystemInfoProto getCurrentSystemInfoProto() {
TransportProtos.SystemInfoProto.Builder builder = TransportProtos.SystemInfoProto.newBuilder();

2
common/queue/src/main/java/org/thingsboard/server/queue/discovery/DiscoveryService.java

@ -25,4 +25,6 @@ public interface DiscoveryService {
boolean isMonolith();
void setReady(boolean ready);
}

9
common/queue/src/main/java/org/thingsboard/server/queue/discovery/DummyDiscoveryService.java

@ -55,4 +55,13 @@ public class DummyDiscoveryService implements DiscoveryService {
public boolean isMonolith() {
return true;
}
@Override
public void setReady(boolean ready) {
boolean changed = serviceInfoProvider.setReady(ready);
if (changed) {
serviceInfoProvider.generateNewServiceInfoWithCurrentSystemInfo();
}
}
}

2
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java

@ -35,4 +35,6 @@ public interface TbServiceInfoProvider {
Set<UUID> getAssignedTenantProfiles();
boolean setReady(boolean ready);
}

13
common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java

@ -177,6 +177,19 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
}
@Override
public void setReady(boolean ready) {
log.debug("Marking current service as {}", ready ? "ready" : "NOT ready");
boolean changed = serviceInfoProvider.setReady(ready);
if (changed) {
try {
publishCurrentServer();
} catch (Exception e) {
log.error("Failed to update server readiness status", e);
}
}
}
private boolean currentServerExists() {
if (nodePath == null) {
return false;

4
common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java

@ -44,6 +44,10 @@ public class EdqsConfig {
private int maxPendingRequests;
@Value("${queue.edqs.max_request_timeout:20000}")
private int maxRequestTimeout;
@Value("${queue.edqs.request_executor_size:50}")
private int requestExecutorSize;
@Value("${queue.edqs.versions_cache_ttl:60}")
private int versionsCacheTtl;
public String getLabel() {
if (partitioningStrategy == EdqsPartitioningStrategy.NONE) {

70
common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsExecutors.java

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.edqs;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Lazy
@Component
@Getter
@RequiredArgsConstructor
public class EdqsExecutors {
private final EdqsConfig edqsConfig;
private ExecutorService consumersExecutor;
private ExecutorService consumerTaskExecutor;
private ScheduledExecutorService scheduler;
private ListeningExecutorService requestExecutor;
@PostConstruct
private void init() {
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer"));
consumerTaskExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-consumer-task-executor");
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-scheduler");
requestExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(edqsConfig.getRequestExecutorSize(), "edqs-requests"));
}
@PreDestroy
private void destroy() {
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
if (consumerTaskExecutor != null) {
consumerTaskExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
if (requestExecutor != null) {
requestExecutor.shutdownNow();
}
}
}

5
common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsQueueFactory.java

@ -19,8 +19,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface EdqsQueueFactory {
@ -33,7 +34,7 @@ public interface EdqsQueueFactory {
TbQueueProducer<TbProtoQueueMsg<ToEdqsMsg>> createEdqsStateProducer();
TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate();
PartitionedQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate(TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> handler);
TbQueueAdmin getEdqsQueueAdmin();

26
common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsQueueFactory.java

@ -17,16 +17,15 @@ package org.thingsboard.server.queue.edqs;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate;
import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
@ -39,6 +38,7 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
private final InMemoryStorage storage;
private final EdqsConfig edqsConfig;
private final EdqsExecutors edqsExecutors;
private final StatsFactory statsFactory;
private final TbQueueAdmin queueAdmin;
@ -63,17 +63,21 @@ public class InMemoryEdqsQueueFactory implements EdqsQueueFactory {
}
@Override
public TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate() {
TbQueueConsumer<TbProtoQueueMsg<ToEdqsMsg>> requestConsumer = new InMemoryTbQueueConsumer<>(storage, edqsConfig.getRequestsTopic());
public PartitionedQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate(TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> handler) {
TbQueueProducer<TbProtoQueueMsg<FromEdqsMsg>> responseProducer = new InMemoryTbQueueProducer<>(storage, edqsConfig.getResponsesTopic());
return DefaultTbQueueResponseTemplate.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>>builder()
.requestTemplate(requestConsumer)
.responseTemplate(responseProducer)
.maxPendingRequests(edqsConfig.getMaxPendingRequests())
.requestTimeout(edqsConfig.getMaxRequestTimeout())
return PartitionedQueueResponseTemplate.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>>builder()
.key("edqs")
.handler(handler)
.requestsTopic(edqsConfig.getRequestsTopic())
.consumerCreator(tpi -> new InMemoryTbQueueConsumer<>(storage, edqsConfig.getRequestsTopic()))
.responseProducer(responseProducer)
.pollInterval(edqsConfig.getPollInterval())
.requestTimeout(edqsConfig.getMaxRequestTimeout())
.maxPendingRequests(edqsConfig.getMaxPendingRequests())
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.callbackExecutor(edqsExecutors.getRequestExecutor())
.consumerTaskExecutor(edqsExecutors.getConsumerTaskExecutor())
.stats(statsFactory.createMessagesStats(StatsType.EDQS.getName()))
.executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs"))
.build();
}

35
common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsQueueFactory.java

@ -16,14 +16,13 @@
package org.thingsboard.server.queue.edqs;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueResponseTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueResponseTemplate;
import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
@ -45,6 +44,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
private final TbKafkaAdmin edqsRequestsAdmin;
private final TbKafkaAdmin edqsStateAdmin;
private final EdqsConfig edqsConfig;
private final EdqsExecutors edqsExecutors;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TopicService topicService;
@ -53,7 +53,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
private final AtomicInteger consumerCounter = new AtomicInteger();
public KafkaEdqsQueueFactory(TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs topicConfigs,
EdqsConfig edqsConfig, TbServiceInfoProvider serviceInfoProvider,
EdqsConfig edqsConfig, EdqsExecutors edqsExecutors, TbServiceInfoProvider serviceInfoProvider,
TbKafkaConsumerStatsService consumerStatsService, TopicService topicService,
StatsFactory statsFactory) {
this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsEventsConfigs());
@ -61,6 +61,7 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
this.edqsStateAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsStateConfigs());
this.kafkaSettings = kafkaSettings;
this.edqsConfig = edqsConfig;
this.edqsExecutors = edqsExecutors;
this.serviceInfoProvider = serviceInfoProvider;
this.consumerStatsService = consumerStatsService;
this.topicService = topicService;
@ -116,25 +117,29 @@ public class KafkaEdqsQueueFactory implements EdqsQueueFactory {
}
@Override
public TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate() {
var requestConsumer = createEdqsMsgConsumer(edqsConfig.getRequestsTopic(),
"edqs-requests-consumer-" + serviceInfoProvider.getServiceId(),
"edqs-requests-consumer-group",
false, edqsRequestsAdmin);
public PartitionedQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> createEdqsResponseTemplate(TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> handler) {
var responseProducer = TbKafkaProducerTemplate.<TbProtoQueueMsg<FromEdqsMsg>>builder()
.settings(kafkaSettings)
.clientId("edqs-response-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(edqsConfig.getResponsesTopic()))
.admin(edqsRequestsAdmin)
.build();
return DefaultTbQueueResponseTemplate.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>>builder()
.requestTemplate(requestConsumer)
.responseTemplate(responseProducer)
.maxPendingRequests(edqsConfig.getMaxPendingRequests())
.requestTimeout(edqsConfig.getMaxRequestTimeout())
return PartitionedQueueResponseTemplate.<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>>builder()
.key("edqs")
.handler(handler)
.requestsTopic(topicService.buildTopicName(edqsConfig.getRequestsTopic()))
.consumerCreator(tpi -> createEdqsMsgConsumer(edqsConfig.getRequestsTopic(),
"edqs-requests-consumer-" + serviceInfoProvider.getServiceId() + "-" + tpi.getPartition().orElse(999),
"edqs-requests-consumer-group",
false, edqsRequestsAdmin))
.responseProducer(responseProducer)
.pollInterval(edqsConfig.getPollInterval())
.requestTimeout(edqsConfig.getMaxRequestTimeout())
.maxPendingRequests(edqsConfig.getMaxPendingRequests())
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.callbackExecutor(edqsExecutors.getRequestExecutor())
.consumerTaskExecutor(edqsExecutors.getConsumerTaskExecutor())
.stats(statsFactory.createMessagesStats(StatsType.EDQS.getName()))
.executor(ThingsBoardExecutors.newWorkStealingPool(5, "edqs"))
.build();
}

3
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java

@ -93,8 +93,7 @@ public class TbKafkaConsumerStatsService {
log.info("[{}] Topic partitions with lag: [{}].", groupId, builder.toString());
}
} catch (Exception e) {
log.warn("[{}] Failed to get consumer group stats. Reason - {}.", groupId, e.getMessage());
log.trace("Detailed error: ", e);
log.warn("[{}] Failed to get consumer group stats", groupId, e);
}
}

4
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java

@ -75,10 +75,6 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
topics = ConcurrentHashMap.newKeySet();
}
@Override
public void init() {
}
void addAnalyticHeaders(List<Header> headers) {
headers.add(new RecordHeader("_producerId", getClientId().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("_threadName", Thread.currentThread().getName().getBytes(StandardCharsets.UTF_8)));

5
common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueProducer.java

@ -33,11 +33,6 @@ public class InMemoryTbQueueProducer<T extends TbQueueMsg> implements TbQueuePro
this.defaultTopic = defaultTopic;
}
@Override
public void init() {
}
@Override
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
boolean result = storage.put(tpi.getFullTopicName(), msg);

19
common/queue/src/test/java/org/thingsboard/server/queue/common/DefaultTbQueueRequestTemplateTest.java

@ -37,11 +37,23 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.*;
import static org.mockito.Mockito.*;
import static org.mockito.BDDMockito.RETURNS_DEEP_STUBS;
import static org.mockito.BDDMockito.atLeastOnce;
import static org.mockito.BDDMockito.lenient;
import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.spy;
import static org.mockito.BDDMockito.times;
import static org.mockito.BDDMockito.verify;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willDoNothing;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.hamcrest.MockitoHamcrest.longThat;
@Slf4j
@ -96,7 +108,6 @@ public class DefaultTbQueueRequestTemplateTest {
inst.init();
assertThat(inst.nextCleanupNs, equalTo(0L));
verify(queueAdmin, times(1)).createTopicIfNotExists(topic);
verify(requestTemplate, times(1)).init();
verify(responseTemplate, times(1)).subscribe();
verify(executorMock, times(1)).submit(any(Runnable.class));

10
dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java

@ -47,6 +47,7 @@ import org.thingsboard.server.common.data.query.EntityTypeFilter;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.common.stats.EdqsStatsService;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@ -88,6 +89,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
@Lazy
EntityServiceRegistry entityServiceRegistry;
@Autowired
private EdqsService edqsService;
@Autowired
@Lazy
private EdqsApiService edqsApiService;
@ -104,7 +108,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
long startNs = System.nanoTime();
Long result;
if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
if (edqsService.isApiEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
EdqsRequest request = EdqsRequest.builder()
.entityCountQuery(query)
.build();
@ -126,7 +130,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
long startNs = System.nanoTime();
PageData<EntityData> result;
if (edqsApiService.isEnabled() && validForEdqs(query)) {
if (edqsService.isApiEnabled() && validForEdqs(query)) {
EdqsRequest request = EdqsRequest.builder()
.entityDataQuery(query)
.build();
@ -295,7 +299,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
}
if ((query.getEntityFields() == null || query.getEntityFields().isEmpty()) &&
(query.getLatestValues() == null || query.getLatestValues().isEmpty())) {
(query.getLatestValues() == null || query.getLatestValues().isEmpty())) {
return false;
}

15
dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsApiService.java

@ -35,24 +35,9 @@ public class DummyEdqsApiService implements EdqsApiService {
throw new UnsupportedOperationException();
}
@Override
public boolean isEnabled() {
return false;
}
@Override
public void setEnabled(boolean enabled) {
log.warn("Got request to enable EDQS API, but it isn't supported", new RuntimeException("stacktrace"));
}
@Override
public boolean isSupported() {
return false;
}
@Override
public boolean isAutoEnable() {
return false;
}
}

11
dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java

@ -19,6 +19,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.id.EntityId;
@ -47,4 +48,14 @@ public class DummyEdqsService implements EdqsService {
@Override
public void processSystemMsg(ToCoreEdqsMsg request) {}
@Override
public boolean isApiEnabled() {
return getState().isApiEnabled();
}
@Override
public EdqsState getState() {
return new EdqsState();
}
}

4
edqs/src/main/resources/edqs.yml

@ -71,6 +71,10 @@ queue:
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
# Maximum timeout for requests to EDQS
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
# Thread pool size for EDQS requests executor
request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}"
# Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process.
versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}"
# Strings longer than this threshold will be compressed
string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}"
stats:

11
edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java

@ -60,7 +60,8 @@ import org.thingsboard.server.common.data.query.StringFilterPredicate;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.stats.DummyEdqsStatsService;
import org.thingsboard.server.edqs.util.EdqsConverter;
import org.thingsboard.server.edqs.util.DefaultEdqsMapper;
import org.thingsboard.server.edqs.util.EdqsMapper;
import java.util.Collections;
import java.util.List;
@ -79,7 +80,7 @@ public abstract class AbstractEDQTest {
@Autowired
protected DefaultEdqsRepository repository;
@Autowired
protected EdqsConverter edqsConverter;
protected EdqsMapper edqsMapper;
@MockBean
private DummyEdqsStatsService edqsStatsService;
@ -244,12 +245,12 @@ public abstract class AbstractEDQTest {
}
protected void addOrUpdate(EntityType entityType, Object entity) {
addOrUpdate(EdqsConverter.toEntity(entityType, entity));
addOrUpdate(DefaultEdqsMapper.toEntity(entityType, entity));
}
protected void addOrUpdate(EdqsObject edqsObject) {
byte[] serialized = edqsConverter.serialize(edqsObject.type(), edqsObject);
edqsObject = edqsConverter.deserialize(edqsObject.type(), serialized);
byte[] serialized = edqsMapper.serialize(edqsObject);
edqsObject = edqsMapper.deserialize(edqsObject.type(), serialized, false);
repository.get(tenantId).addOrUpdate(edqsObject);
}

7
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java

@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
@ -738,13 +739,13 @@ public class TestRestClient {
.as(Long.class);
}
public Boolean isEdqsApiEnabled() {
public EdqsState getEdqsState() {
return given().spec(requestSpec)
.get("/api/edqs/enabled")
.get("/api/edqs/state")
.then()
.statusCode(HTTP_OK)
.extract()
.as(Boolean.class);
.as(EdqsState.class);
}
public void assignDeviceToCustomer(CustomerId customerId, DeviceId id) {

2
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/edqs/EdqsEntityDataQueryTest.java

@ -73,7 +73,7 @@ public class EdqsEntityDataQueryTest extends AbstractContainerTest {
@BeforeClass
public void beforeClass() throws Exception {
testRestClient.login("sysadmin@thingsboard.org", "sysadmin");
await().atMost(60, TimeUnit.SECONDS).until(() -> testRestClient.isEdqsApiEnabled());
await().atMost(60, TimeUnit.SECONDS).until(() -> testRestClient.getEdqsState().isApiEnabled());
tenantId = testRestClient.postTenant(EntityPrototypes.defaultTenantPrototype("Tenant")).getId();
tenantAdminId = testRestClient.createUserAndLogin(defaultTenantAdmin(tenantId, "tenantAdmin@thingsboard.org"), "tenant");

Loading…
Cancel
Save