Browse Source

Device State Service improvements and race condition fix

pull/6131/head
Andrii Shvaika 4 years ago
parent
commit
45df14e39e
  1. 8
      application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java
  2. 56
      application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

8
application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java

@ -120,11 +120,9 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
onAddedPartitions(addedPartitions);
}
scheduledExecutor.submit(() -> {
log.info("Managing following partitions:");
partitionedEntities.forEach((tpi, entities) -> {
log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size());
});
log.info("Managing following partitions:");
partitionedEntities.forEach((tpi, entities) -> {
log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size());
});
} catch (Throwable t) {
log.warn("Failed to init entities state from DB", t);

56
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

@ -175,6 +175,9 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
return;
}
log.trace("on Device Connect [{}]", deviceId.getId());
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
long ts = System.currentTimeMillis();
@ -182,17 +185,19 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
save(deviceId, LAST_CONNECT_TIME, ts);
pushRuleEngineMessage(stateData, CONNECT_EVENT);
checkAndUpdateState(deviceId, stateData);
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivity) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
return;
}
log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity);
final DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (lastReportedActivity > 0 && lastReportedActivity > stateData.getState().getLastActivityTime()) {
updateActivityState(deviceId, stateData, lastReportedActivity);
}
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
}
void updateActivityState(DeviceId deviceId, DeviceStateData stateData, long lastReportedActivity) {
@ -214,12 +219,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
return;
}
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
long ts = System.currentTimeMillis();
stateData.getState().setLastDisconnectTime(ts);
save(deviceId, LAST_DISCONNECT_TIME, ts);
pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
}
@Override
@ -227,11 +234,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
if (inactivityTimeout <= 0L) {
return;
}
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
return;
}
log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout);
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
stateData.getState().setInactivityTimeout(inactivityTimeout);
checkAndUpdateState(deviceId, stateData);
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
}
@Override
@ -292,19 +302,19 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
for (Tenant tenant : tenants) {
log.debug("Finding devices for tenant [{}]", tenant.getName());
final PageLink pageLink = new PageLink(initFetchPackSize);
scheduledExecutor.submit(() -> processPageAndSubmitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor));
processPageAndSubmitNextPage(addedPartitions, tenant, pageLink);
}
}
private void processPageAndSubmitNextPage(final Set<TopicPartitionInfo> addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) {
private void processPageAndSubmitNextPage(final Set<TopicPartitionInfo> addedPartitions, final Tenant tenant, final PageLink pageLink) {
log.trace("[{}] Process page {} from {}", tenant, pageLink.getPage(), pageLink.getPageSize());
List<ListenableFuture<Void>> fetchFutures = new ArrayList<>();
PageData<Device> page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink);
for (Device device : page.getData()) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId());
if (addedPartitions.contains(tpi)) {
if (addedPartitions.contains(tpi) && !deviceStates.containsKey(device.getId())) {
log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi);
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<DeviceStateData, Void>() {
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<>() {
@Nullable
@Override
public Void apply(@Nullable DeviceStateData state) {
@ -323,7 +333,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback<List<Void>>() {
Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback<>() {
@Override
public void onSuccess(List<Void> result) {
log.trace("[{}] Success init device state from DB for batch size {}", tenant.getId(), result.size());
@ -339,7 +349,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
final PageLink nextPageLink = page.hasNext() ? pageLink.nextPageLink() : null;
if (nextPageLink != null) {
log.trace("[{}] Submit next page {} from {}", tenant, nextPageLink.getPage(), nextPageLink.getPageSize());
executor.submit(() -> processPageAndSubmitNextPage(addedPartitions, tenant, nextPageLink, executor));
processPageAndSubmitNextPage(addedPartitions, tenant, nextPageLink);
}
}
@ -358,7 +368,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
if (deviceIds != null) {
deviceIds.add(state.getDeviceId());
deviceStates.put(state.getDeviceId(), state);
deviceStates.putIfAbsent(state.getDeviceId(), state);
} else {
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
throw new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!");
@ -384,12 +394,18 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
log.trace("Processing state {} for device {}", stateData, deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
if (!isActive(ts, state) && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
save(deviceId, ACTIVITY_STATE, false);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
if (!isActive(ts, state)
&& (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())
&& stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) {
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
save(deviceId, ACTIVITY_STATE, false);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
} else {
cleanupEntity(deviceId);
}
}
} else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
@ -426,13 +442,15 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private void cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) {
private boolean cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
if (!partitionedEntities.containsKey(tpi)) {
boolean cleanup = !partitionedEntities.containsKey(tpi);
if (cleanup) {
cleanupEntity(deviceId);
log.debug("[{}][{}] device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
, tenantId, deviceId, tpi.getFullTopicName());
}
return cleanup;
}
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {

Loading…
Cancel
Save