|
|
|
@ -135,6 +135,8 @@ public class DefaultDeviceStateService implements DeviceStateService { |
|
|
|
@Getter |
|
|
|
private int initFetchPackSize; |
|
|
|
|
|
|
|
private volatile boolean clusterUpdatePending = false; |
|
|
|
|
|
|
|
private ListeningScheduledExecutorService queueExecutor; |
|
|
|
private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>(); |
|
|
|
private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>(); |
|
|
|
@ -192,7 +194,10 @@ public class DefaultDeviceStateService implements DeviceStateService { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onClusterUpdate() { |
|
|
|
queueExecutor.submit(this::onClusterUpdateSync); |
|
|
|
if (!clusterUpdatePending) { |
|
|
|
clusterUpdatePending = true; |
|
|
|
queueExecutor.submit(this::onClusterUpdateSync); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -220,6 +225,7 @@ public class DefaultDeviceStateService implements DeviceStateService { |
|
|
|
} |
|
|
|
|
|
|
|
private void onClusterUpdateSync() { |
|
|
|
clusterUpdatePending = false; |
|
|
|
List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData(); |
|
|
|
for (Tenant tenant : tenants) { |
|
|
|
List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>(); |
|
|
|
|