diff --git a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java index 632fb9938b..18a0a1f220 100644 --- a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java +++ b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java @@ -120,11 +120,9 @@ public abstract class AbstractPartitionBasedService extends onAddedPartitions(addedPartitions); } - scheduledExecutor.submit(() -> { - log.info("Managing following partitions:"); - partitionedEntities.forEach((tpi, entities) -> { - log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size()); - }); + log.info("Managing following partitions:"); + partitionedEntities.forEach((tpi, entities) -> { + log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size()); }); } catch (Throwable t) { log.warn("Failed to init entities state from DB", t); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 1d6b0dcb4b..10f1fcea85 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -175,6 +175,9 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService 0 && lastReportedActivity > stateData.getState().getLastActivityTime()) { updateActivityState(deviceId, stateData, lastReportedActivity); } - cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId); } void updateActivityState(DeviceId deviceId, DeviceStateData stateData, long lastReportedActivity) { @@ -214,12 +219,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService processPageAndSubmitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor)); + processPageAndSubmitNextPage(addedPartitions, tenant, pageLink); } } - private void processPageAndSubmitNextPage(final Set addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) { + private void processPageAndSubmitNextPage(final Set addedPartitions, final Tenant tenant, final PageLink pageLink) { log.trace("[{}] Process page {} from {}", tenant, pageLink.getPage(), pageLink.getPageSize()); List> fetchFutures = new ArrayList<>(); PageData page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); for (Device device : page.getData()) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); - if (addedPartitions.contains(tpi)) { + if (addedPartitions.contains(tpi) && !deviceStates.containsKey(device.getId())) { log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); - ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function() { + ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function<>() { @Nullable @Override public Void apply(@Nullable DeviceStateData state) { @@ -323,7 +333,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService>() { + Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback<>() { @Override public void onSuccess(List result) { log.trace("[{}] Success init device state from DB for batch size {}", tenant.getId(), result.size()); @@ -339,7 +349,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService processPageAndSubmitNextPage(addedPartitions, tenant, nextPageLink, executor)); + processPageAndSubmitNextPage(addedPartitions, tenant, nextPageLink); } } @@ -358,7 +368,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceIds = partitionedEntities.get(tpi); if (deviceIds != null) { deviceIds.add(state.getDeviceId()); - deviceStates.put(state.getDeviceId(), state); + deviceStates.putIfAbsent(state.getDeviceId(), state); } else { log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName()); throw new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"); @@ -384,12 +394,18 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService