Browse Source

Add callback executor for device state service

pull/10380/head
ViacheslavKlimov 2 years ago
parent
commit
50f8951072
  1. 16
      application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

16
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

@ -191,6 +191,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private int telemetryTtl;
private ListeningExecutorService deviceStateExecutor;
private ListeningExecutorService deviceStateCallbackExecutor;
final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
@ -199,6 +200,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
super.init();
deviceStateExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), "device-state"));
deviceStateCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), "device-state-callback"));
scheduledExecutor.scheduleWithFixedDelay(this::checkStates, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
scheduledExecutor.scheduleWithFixedDelay(this::reportActivityStats, defaultActivityStatsIntervalInSec, defaultActivityStatsIntervalInSec, TimeUnit.SECONDS);
}
@ -209,6 +212,9 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
if (deviceStateExecutor != null) {
deviceStateExecutor.shutdownNow();
}
if (deviceStateCallbackExecutor != null) {
deviceStateCallbackExecutor.shutdownNow();
}
}
@Override
@ -372,7 +378,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
log.warn("Failed to register device to the state service", t);
callback.onFailure(t);
}
}, deviceStateExecutor);
}, deviceStateCallbackExecutor);
} else if (proto.getUpdated()) {
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
TbMsgMetaData md = new TbMsgMetaData();
@ -635,10 +641,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
ListenableFuture<DeviceStateData> future;
if (persistToTelemetry) {
ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
future = Futures.transform(tsData, extractDeviceStateData(device), deviceStateExecutor);
future = Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
} else {
ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
future = Futures.transform(attrData, extractDeviceStateData(device), deviceStateExecutor);
future = Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
}
return transformInactivityTimeout(future);
}
@ -656,8 +662,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
});
return deviceStateData;
}, deviceStateExecutor);
}, deviceStateExecutor);
}, MoreExecutors.directExecutor());
}, deviceStateCallbackExecutor);
}
private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {

Loading…
Cancel
Save