From fa7da7f96f2ac96553e028ee8f355d1c2bde439b Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 14 May 2021 18:39:50 +0300 Subject: [PATCH] state service: prevent initDeviceStatesForPartitions if added NO partitions --- .../state/DefaultDeviceStateService.java | 88 +++++++++++-------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index f3798ecc5b..271dc11e67 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -367,45 +367,8 @@ public class DefaultDeviceStateService extends TbApplicationEventListener partitionedDevices.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); - //TODO 3.0: replace this dummy search with new functionality to search by partitions using SQL capabilities. - // Adding only devices that are in new partitions - List tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); - for (Tenant tenant : tenants) { - log.debug("Finding devices for tenant [{}]", tenant.getName()); - PageLink pageLink = new PageLink(initFetchPackSize); - while (pageLink != null) { - List> fetchFutures = new ArrayList<>(); - PageData page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); - pageLink = page.hasNext() ? pageLink.nextPageLink() : null; - for (Device device : page.getData()) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); - if (addedPartitions.contains(tpi)) { - log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); - ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function() { - @Nullable - @Override - public Void apply(@Nullable DeviceStateData state) { - log.debug("[{}][{}] Fetched state from DB [{}]", device.getName(), device.getId(), state); - if (state != null) { - addDeviceUsingState(tpi, state); - } else { - log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); - } - return null; - } - }, dbCallbackExecutorService); - fetchFutures.add(future); - } else { - log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); - } - } - try { - Futures.successfulAsList(fetchFutures).get(); - } catch (InterruptedException | ExecutionException e) { - log.warn("Failed to init device state service from DB", e); - } - } - } + initDeviceStatesForPartitions(addedPartitions); + log.info("Managing following partitions:"); partitionedDevices.forEach((tpi, devices) -> { log.info("[{}]: {} devices", tpi.getFullTopicName(), devices.size()); @@ -415,6 +378,53 @@ public class DefaultDeviceStateService extends TbApplicationEventListener addedPartitions) { + if (addedPartitions.isEmpty()) { + return false; + } + + List tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); + for (Tenant tenant : tenants) { + log.debug("Finding devices for tenant [{}]", tenant.getName()); + PageLink pageLink = new PageLink(initFetchPackSize); + while (pageLink != null) { + List> fetchFutures = new ArrayList<>(); + PageData page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); + pageLink = page.hasNext() ? pageLink.nextPageLink() : null; + for (Device device : page.getData()) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); + if (addedPartitions.contains(tpi)) { + log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); + ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function() { + @Nullable + @Override + public Void apply(@Nullable DeviceStateData state) { + log.debug("[{}][{}] Fetched state from DB [{}]", device.getName(), device.getId(), state); + if (state != null) { + addDeviceUsingState(tpi, state); + } else { + log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); + } + return null; + } + }, dbCallbackExecutorService); + fetchFutures.add(future); + } else { + log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); + } + } + try { + Futures.successfulAsList(fetchFutures).get(); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to init device state service from DB", e); + } + } + } + return true; + } + private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) { partitionedDevices.computeIfAbsent(tpi, id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId()); deviceStates.put(state.getDeviceId(), state);