|
|
|
@ -29,10 +29,13 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; |
|
|
|
import org.thingsboard.server.common.data.ApiUsageState; |
|
|
|
import org.thingsboard.server.common.data.ApiUsageStateMailMessage; |
|
|
|
import org.thingsboard.server.common.data.ApiUsageStateValue; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.Tenant; |
|
|
|
import org.thingsboard.server.common.data.TenantProfile; |
|
|
|
import org.thingsboard.server.common.data.exception.ThingsboardException; |
|
|
|
import org.thingsboard.server.common.data.id.ApiUsageStateId; |
|
|
|
import org.thingsboard.server.common.data.id.CustomerId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantProfileId; |
|
|
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|
|
|
@ -45,6 +48,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; |
|
|
|
import org.thingsboard.server.common.msg.queue.ServiceType; |
|
|
|
import org.thingsboard.server.common.msg.queue.TbCallback; |
|
|
|
import org.thingsboard.server.common.msg.tools.SchedulerUtils; |
|
|
|
import org.thingsboard.server.dao.customer.CustomerService; |
|
|
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|
|
|
import org.thingsboard.server.dao.tenant.TenantService; |
|
|
|
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|
|
|
@ -63,6 +67,7 @@ import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
@ -95,6 +100,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
private final TbClusterService clusterService; |
|
|
|
private final PartitionService partitionService; |
|
|
|
private final TenantService tenantService; |
|
|
|
private final CustomerService customerService; |
|
|
|
private final TimeseriesService tsService; |
|
|
|
private final ApiUsageStateService apiUsageStateService; |
|
|
|
private final SchedulerComponent scheduler; |
|
|
|
@ -105,10 +111,12 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
@Autowired |
|
|
|
private InternalTelemetryService tsWsService; |
|
|
|
|
|
|
|
// Tenants that should be processed on this server
|
|
|
|
private final Map<TenantId, TenantApiUsageState> myTenantStates = new ConcurrentHashMap<>(); |
|
|
|
// Tenants that should be processed on other servers
|
|
|
|
private final Map<TenantId, ApiUsageState> otherTenantStates = new ConcurrentHashMap<>(); |
|
|
|
// Entities that should be processed on this server
|
|
|
|
private final Map<EntityId, BaseApiUsageState> myUsageStates = new ConcurrentHashMap<>(); |
|
|
|
// Entities that should be processed on other servers
|
|
|
|
private final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private final Set<EntityId> deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
|
|
|
|
|
|
|
@Value("${usage.stats.report.enabled:true}") |
|
|
|
private boolean enabled; |
|
|
|
@ -123,13 +131,16 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
public DefaultTbApiUsageStateService(TbClusterService clusterService, |
|
|
|
PartitionService partitionService, |
|
|
|
TenantService tenantService, |
|
|
|
CustomerService customerService, |
|
|
|
TimeseriesService tsService, |
|
|
|
ApiUsageStateService apiUsageStateService, |
|
|
|
SchedulerComponent scheduler, |
|
|
|
TbTenantProfileCache tenantProfileCache, MailService mailService) { |
|
|
|
TbTenantProfileCache tenantProfileCache, |
|
|
|
MailService mailService) { |
|
|
|
this.clusterService = clusterService; |
|
|
|
this.partitionService = partitionService; |
|
|
|
this.tenantService = tenantService; |
|
|
|
this.customerService = customerService; |
|
|
|
this.tsService = tsService; |
|
|
|
this.apiUsageStateService = apiUsageStateService; |
|
|
|
this.scheduler = scheduler; |
|
|
|
@ -150,74 +161,92 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
@Override |
|
|
|
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) { |
|
|
|
ToUsageStatsServiceMsg statsMsg = msg.getValue(); |
|
|
|
TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); |
|
|
|
|
|
|
|
if (tenantProfileCache.get(tenantId) == null) { |
|
|
|
return; |
|
|
|
TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); |
|
|
|
EntityId entityId; |
|
|
|
if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { |
|
|
|
entityId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); |
|
|
|
} else { |
|
|
|
entityId = tenantId; |
|
|
|
} |
|
|
|
|
|
|
|
TenantApiUsageState tenantState; |
|
|
|
processEntityUsageStats(tenantId, entityId, statsMsg.getValuesList()); |
|
|
|
callback.onSuccess(); |
|
|
|
} |
|
|
|
|
|
|
|
private void processEntityUsageStats(TenantId tenantId, EntityId entityId, List<UsageStatsKVProto> values) { |
|
|
|
if (deletedEntities.contains(entityId)) return; |
|
|
|
|
|
|
|
BaseApiUsageState usageState; |
|
|
|
List<TsKvEntry> updatedEntries; |
|
|
|
Map<ApiFeature, ApiUsageStateValue> result; |
|
|
|
|
|
|
|
updateLock.lock(); |
|
|
|
try { |
|
|
|
tenantState = getOrFetchState(tenantId); |
|
|
|
long ts = tenantState.getCurrentCycleTs(); |
|
|
|
long hourTs = tenantState.getCurrentHourTs(); |
|
|
|
usageState = getOrFetchState(tenantId, entityId); |
|
|
|
long ts = usageState.getCurrentCycleTs(); |
|
|
|
long hourTs = usageState.getCurrentHourTs(); |
|
|
|
long newHourTs = SchedulerUtils.getStartOfCurrentHour(); |
|
|
|
if (newHourTs != hourTs) { |
|
|
|
tenantState.setHour(newHourTs); |
|
|
|
usageState.setHour(newHourTs); |
|
|
|
} |
|
|
|
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); |
|
|
|
Set<ApiFeature> apiFeatures = new HashSet<>(); |
|
|
|
for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) { |
|
|
|
for (UsageStatsKVProto kvProto : values) { |
|
|
|
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); |
|
|
|
long newValue = tenantState.add(recordKey, kvProto.getValue()); |
|
|
|
long newValue = usageState.add(recordKey, kvProto.getValue()); |
|
|
|
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue))); |
|
|
|
long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue()); |
|
|
|
long newHourlyValue = usageState.addToHourly(recordKey, kvProto.getValue()); |
|
|
|
updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue))); |
|
|
|
apiFeatures.add(recordKey.getApiFeature()); |
|
|
|
} |
|
|
|
result = tenantState.checkStateUpdatedDueToThreshold(apiFeatures); |
|
|
|
if (usageState.getEntityType() == EntityType.TENANT && !usageState.getEntityId().equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
result = ((TenantApiUsageState) usageState).checkStateUpdatedDueToThreshold(apiFeatures); |
|
|
|
} else { |
|
|
|
result = Collections.emptyMap(); |
|
|
|
} |
|
|
|
} finally { |
|
|
|
updateLock.unlock(); |
|
|
|
} |
|
|
|
tsWsService.saveAndNotifyInternal(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); |
|
|
|
tsWsService.saveAndNotifyInternal(tenantId, usageState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); |
|
|
|
if (!result.isEmpty()) { |
|
|
|
persistAndNotify(tenantState, result); |
|
|
|
persistAndNotify(usageState, result); |
|
|
|
} |
|
|
|
callback.onSuccess(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { |
|
|
|
if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) { |
|
|
|
myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); |
|
|
|
otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); |
|
|
|
myUsageStates.entrySet().removeIf(entry -> { |
|
|
|
return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); |
|
|
|
}); |
|
|
|
otherUsageStates.entrySet().removeIf(entry -> { |
|
|
|
return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); |
|
|
|
}); |
|
|
|
initStatesFromDataBase(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public ApiUsageState getApiUsageState(TenantId tenantId) { |
|
|
|
TenantApiUsageState tenantState = myTenantStates.get(tenantId); |
|
|
|
TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId); |
|
|
|
if (tenantState != null) { |
|
|
|
return tenantState.getApiUsageState(); |
|
|
|
} else { |
|
|
|
ApiUsageState state = otherTenantStates.get(tenantId); |
|
|
|
ApiUsageState state = otherUsageStates.get(tenantId); |
|
|
|
if (state != null) { |
|
|
|
return state; |
|
|
|
} else { |
|
|
|
if (partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { |
|
|
|
return getOrFetchState(tenantId).getApiUsageState(); |
|
|
|
return getOrFetchState(tenantId, tenantId).getApiUsageState(); |
|
|
|
} else { |
|
|
|
updateLock.lock(); |
|
|
|
try { |
|
|
|
state = otherTenantStates.get(tenantId); |
|
|
|
state = otherUsageStates.get(tenantId); |
|
|
|
if (state == null) { |
|
|
|
state = apiUsageStateService.findTenantApiUsageState(tenantId); |
|
|
|
if (state != null) { |
|
|
|
otherTenantStates.put(tenantId, state); |
|
|
|
otherUsageStates.put(tenantId, state); |
|
|
|
} |
|
|
|
} |
|
|
|
} finally { |
|
|
|
@ -231,7 +260,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onApiUsageStateUpdate(TenantId tenantId) { |
|
|
|
otherTenantStates.remove(tenantId); |
|
|
|
otherUsageStates.remove(tenantId); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -240,11 +269,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId); |
|
|
|
updateLock.lock(); |
|
|
|
try { |
|
|
|
myTenantStates.values().forEach(state -> { |
|
|
|
if (tenantProfile.getId().equals(state.getTenantProfileId())) { |
|
|
|
updateTenantState(state, tenantProfile); |
|
|
|
} |
|
|
|
}); |
|
|
|
myUsageStates.values().stream() |
|
|
|
.filter(state -> state.getEntityType() == EntityType.TENANT) |
|
|
|
.map(state -> (TenantApiUsageState) state) |
|
|
|
.forEach(state -> { |
|
|
|
if (tenantProfile.getId().equals(state.getTenantProfileId())) { |
|
|
|
updateTenantState(state, tenantProfile); |
|
|
|
} |
|
|
|
}); |
|
|
|
} finally { |
|
|
|
updateLock.unlock(); |
|
|
|
} |
|
|
|
@ -256,7 +288,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
TenantProfile tenantProfile = tenantProfileCache.get(tenantId); |
|
|
|
updateLock.lock(); |
|
|
|
try { |
|
|
|
TenantApiUsageState state = myTenantStates.get(tenantId); |
|
|
|
TenantApiUsageState state = (TenantApiUsageState) myUsageStates.get(tenantId); |
|
|
|
if (state != null && !state.getTenantProfileId().equals(tenantProfile.getId())) { |
|
|
|
updateTenantState(state, tenantProfile); |
|
|
|
} |
|
|
|
@ -293,29 +325,42 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void persistAndNotify(TenantApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) { |
|
|
|
log.info("[{}] Detected update of the API state: {}", state.getTenantId(), result); |
|
|
|
public void onTenantDelete(TenantId tenantId) { |
|
|
|
deletedEntities.add(tenantId); |
|
|
|
myUsageStates.remove(tenantId); |
|
|
|
otherUsageStates.remove(tenantId); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onCustomerDelete(CustomerId customerId) { |
|
|
|
deletedEntities.add(customerId); |
|
|
|
myUsageStates.remove(customerId); |
|
|
|
} |
|
|
|
|
|
|
|
private void persistAndNotify(BaseApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) { |
|
|
|
log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result); |
|
|
|
apiUsageStateService.update(state.getApiUsageState()); |
|
|
|
clusterService.onApiStateChange(state.getApiUsageState(), null); |
|
|
|
long ts = System.currentTimeMillis(); |
|
|
|
List<TsKvEntry> stateTelemetry = new ArrayList<>(); |
|
|
|
result.forEach(((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name()))))); |
|
|
|
result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name())))); |
|
|
|
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK); |
|
|
|
|
|
|
|
String email = tenantService.findTenantById(state.getTenantId()).getEmail(); |
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(email)) { |
|
|
|
result.forEach((apiFeature, stateValue) -> { |
|
|
|
mailExecutor.submit(() -> { |
|
|
|
try { |
|
|
|
mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage(state, apiFeature, stateValue)); |
|
|
|
} catch (ThingsboardException e) { |
|
|
|
log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e); |
|
|
|
} |
|
|
|
if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
String email = tenantService.findTenantById(state.getTenantId()).getEmail(); |
|
|
|
if (StringUtils.isNotEmpty(email)) { |
|
|
|
result.forEach((apiFeature, stateValue) -> { |
|
|
|
mailExecutor.submit(() -> { |
|
|
|
try { |
|
|
|
mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage((TenantApiUsageState) state, apiFeature, stateValue)); |
|
|
|
} catch (ThingsboardException e) { |
|
|
|
log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e); |
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
}); |
|
|
|
} else { |
|
|
|
log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId()); |
|
|
|
} else { |
|
|
|
log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -350,12 +395,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
updateLock.lock(); |
|
|
|
try { |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
myTenantStates.values().forEach(state -> { |
|
|
|
myUsageStates.values().forEach(state -> { |
|
|
|
if ((state.getNextCycleTs() < now) && (now - state.getNextCycleTs() < TimeUnit.HOURS.toMillis(1))) { |
|
|
|
TenantId tenantId = state.getTenantId(); |
|
|
|
state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth()); |
|
|
|
saveNewCounts(state, Arrays.asList(ApiUsageRecordKey.values())); |
|
|
|
updateTenantState(state, tenantProfileCache.get(tenantId)); |
|
|
|
if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
TenantId tenantId = state.getTenantId(); |
|
|
|
updateTenantState((TenantApiUsageState) state, tenantProfileCache.get(tenantId)); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
} finally { |
|
|
|
@ -363,7 +410,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void saveNewCounts(TenantApiUsageState state, List<ApiUsageRecordKey> keys) { |
|
|
|
private void saveNewCounts(BaseApiUsageState state, List<ApiUsageRecordKey> keys) { |
|
|
|
List<TsKvEntry> counts = keys.stream() |
|
|
|
.map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L))) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
@ -371,52 +418,66 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK); |
|
|
|
} |
|
|
|
|
|
|
|
private TenantApiUsageState getOrFetchState(TenantId tenantId) { |
|
|
|
TenantApiUsageState tenantState = myTenantStates.get(tenantId); |
|
|
|
if (tenantState == null) { |
|
|
|
ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); |
|
|
|
if (dbStateEntity == null) { |
|
|
|
try { |
|
|
|
dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId); |
|
|
|
} catch (Exception e) { |
|
|
|
dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); |
|
|
|
} |
|
|
|
} |
|
|
|
TenantProfile tenantProfile = tenantProfileCache.get(tenantId); |
|
|
|
tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity); |
|
|
|
List<ApiUsageRecordKey> newCounts = new ArrayList<>(); |
|
|
|
private BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) { |
|
|
|
if (entityId == null || entityId.isNullUid()) { |
|
|
|
entityId = tenantId; |
|
|
|
} |
|
|
|
BaseApiUsageState state = myUsageStates.get(entityId); |
|
|
|
if (state != null) { |
|
|
|
return state; |
|
|
|
} |
|
|
|
|
|
|
|
ApiUsageState storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId); |
|
|
|
if (storedState == null) { |
|
|
|
try { |
|
|
|
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getId()).get(); |
|
|
|
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { |
|
|
|
boolean cycleEntryFound = false; |
|
|
|
boolean hourlyEntryFound = false; |
|
|
|
for (TsKvEntry tsKvEntry : dbValues) { |
|
|
|
if (tsKvEntry.getKey().equals(key.getApiCountKey())) { |
|
|
|
cycleEntryFound = true; |
|
|
|
|
|
|
|
boolean oldCount = tsKvEntry.getTs() == tenantState.getCurrentCycleTs(); |
|
|
|
tenantState.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
|
|
|
|
if (!oldCount) { |
|
|
|
newCounts.add(key); |
|
|
|
} |
|
|
|
} else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { |
|
|
|
hourlyEntryFound = true; |
|
|
|
tenantState.putHourly(key, tsKvEntry.getTs() == tenantState.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
} |
|
|
|
if (cycleEntryFound && hourlyEntryFound) { |
|
|
|
break; |
|
|
|
storedState = apiUsageStateService.createDefaultApiUsageState(tenantId, entityId); |
|
|
|
} catch (Exception e) { |
|
|
|
storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId); |
|
|
|
} |
|
|
|
} |
|
|
|
if (entityId.getEntityType() == EntityType.TENANT) { |
|
|
|
if (!entityId.equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
state = new TenantApiUsageState(tenantProfileCache.get((TenantId) entityId), storedState); |
|
|
|
} else { |
|
|
|
state = new TenantApiUsageState(storedState); |
|
|
|
} |
|
|
|
} else { |
|
|
|
state = new CustomerApiUsageState(storedState); |
|
|
|
} |
|
|
|
|
|
|
|
List<ApiUsageRecordKey> newCounts = new ArrayList<>(); |
|
|
|
try { |
|
|
|
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, storedState.getId()).get(); |
|
|
|
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { |
|
|
|
boolean cycleEntryFound = false; |
|
|
|
boolean hourlyEntryFound = false; |
|
|
|
for (TsKvEntry tsKvEntry : dbValues) { |
|
|
|
if (tsKvEntry.getKey().equals(key.getApiCountKey())) { |
|
|
|
cycleEntryFound = true; |
|
|
|
|
|
|
|
boolean oldCount = tsKvEntry.getTs() == state.getCurrentCycleTs(); |
|
|
|
state.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
|
|
|
|
if (!oldCount) { |
|
|
|
newCounts.add(key); |
|
|
|
} |
|
|
|
} else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { |
|
|
|
hourlyEntryFound = true; |
|
|
|
state.putHourly(key, tsKvEntry.getTs() == state.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
} |
|
|
|
if (cycleEntryFound && hourlyEntryFound) { |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
log.debug("[{}] Initialized state: {}", tenantId, dbStateEntity); |
|
|
|
myTenantStates.put(tenantId, tenantState); |
|
|
|
saveNewCounts(tenantState, newCounts); |
|
|
|
} catch (InterruptedException | ExecutionException e) { |
|
|
|
log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); |
|
|
|
} |
|
|
|
log.debug("[{}] Initialized state: {}", entityId, storedState); |
|
|
|
myUsageStates.put(entityId, state); |
|
|
|
saveNewCounts(state, newCounts); |
|
|
|
} catch (InterruptedException | ExecutionException e) { |
|
|
|
log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); |
|
|
|
} |
|
|
|
return tenantState; |
|
|
|
|
|
|
|
return state; |
|
|
|
} |
|
|
|
|
|
|
|
private void initStatesFromDataBase() { |
|
|
|
@ -429,11 +490,11 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); |
|
|
|
List<Future<?>> futures = new ArrayList<>(); |
|
|
|
for (Tenant tenant : tenantIterator) { |
|
|
|
if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { |
|
|
|
if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { |
|
|
|
log.debug("[{}] Initializing tenant state.", tenant.getId()); |
|
|
|
futures.add(tmpInitExecutor.submit(() -> { |
|
|
|
try { |
|
|
|
updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); |
|
|
|
updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); |
|
|
|
log.debug("[{}] Initialized tenant state.", tenant.getId()); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); |
|
|
|
|