|
|
|
@ -367,45 +367,8 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit |
|
|
|
|
|
|
|
addedPartitions.forEach(tpi -> partitionedDevices.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); |
|
|
|
|
|
|
|
//TODO 3.0: replace this dummy search with new functionality to search by partitions using SQL capabilities.
|
|
|
|
// Adding only devices that are in new partitions
|
|
|
|
List<Tenant> tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); |
|
|
|
for (Tenant tenant : tenants) { |
|
|
|
log.debug("Finding devices for tenant [{}]", tenant.getName()); |
|
|
|
PageLink pageLink = new PageLink(initFetchPackSize); |
|
|
|
while (pageLink != null) { |
|
|
|
List<ListenableFuture<Void>> fetchFutures = new ArrayList<>(); |
|
|
|
PageData<Device> page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); |
|
|
|
pageLink = page.hasNext() ? pageLink.nextPageLink() : null; |
|
|
|
for (Device device : page.getData()) { |
|
|
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); |
|
|
|
if (addedPartitions.contains(tpi)) { |
|
|
|
log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); |
|
|
|
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<DeviceStateData, Void>() { |
|
|
|
@Nullable |
|
|
|
@Override |
|
|
|
public Void apply(@Nullable DeviceStateData state) { |
|
|
|
log.debug("[{}][{}] Fetched state from DB [{}]", device.getName(), device.getId(), state); |
|
|
|
if (state != null) { |
|
|
|
addDeviceUsingState(tpi, state); |
|
|
|
} else { |
|
|
|
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
fetchFutures.add(future); |
|
|
|
} else { |
|
|
|
log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); |
|
|
|
} |
|
|
|
} |
|
|
|
try { |
|
|
|
Futures.successfulAsList(fetchFutures).get(); |
|
|
|
} catch (InterruptedException | ExecutionException e) { |
|
|
|
log.warn("Failed to init device state service from DB", e); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
initDeviceStatesForPartitions(addedPartitions); |
|
|
|
|
|
|
|
log.info("Managing following partitions:"); |
|
|
|
partitionedDevices.forEach((tpi, devices) -> { |
|
|
|
log.info("[{}]: {} devices", tpi.getFullTopicName(), devices.size()); |
|
|
|
@ -415,6 +378,53 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//TODO 3.0: replace this dummy search with new functionality to search by partitions using SQL capabilities.
|
|
|
|
// Adding only devices that are in new partitions
|
|
|
|
boolean initDeviceStatesForPartitions(Set<TopicPartitionInfo> addedPartitions) { |
|
|
|
if (addedPartitions.isEmpty()) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
List<Tenant> tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); |
|
|
|
for (Tenant tenant : tenants) { |
|
|
|
log.debug("Finding devices for tenant [{}]", tenant.getName()); |
|
|
|
PageLink pageLink = new PageLink(initFetchPackSize); |
|
|
|
while (pageLink != null) { |
|
|
|
List<ListenableFuture<Void>> fetchFutures = new ArrayList<>(); |
|
|
|
PageData<Device> page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); |
|
|
|
pageLink = page.hasNext() ? pageLink.nextPageLink() : null; |
|
|
|
for (Device device : page.getData()) { |
|
|
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); |
|
|
|
if (addedPartitions.contains(tpi)) { |
|
|
|
log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); |
|
|
|
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<DeviceStateData, Void>() { |
|
|
|
@Nullable |
|
|
|
@Override |
|
|
|
public Void apply(@Nullable DeviceStateData state) { |
|
|
|
log.debug("[{}][{}] Fetched state from DB [{}]", device.getName(), device.getId(), state); |
|
|
|
if (state != null) { |
|
|
|
addDeviceUsingState(tpi, state); |
|
|
|
} else { |
|
|
|
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
fetchFutures.add(future); |
|
|
|
} else { |
|
|
|
log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); |
|
|
|
} |
|
|
|
} |
|
|
|
try { |
|
|
|
Futures.successfulAsList(fetchFutures).get(); |
|
|
|
} catch (InterruptedException | ExecutionException e) { |
|
|
|
log.warn("Failed to init device state service from DB", e); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) { |
|
|
|
partitionedDevices.computeIfAbsent(tpi, id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId()); |
|
|
|
deviceStates.put(state.getDeviceId(), state); |
|
|
|
|