|
|
|
@ -61,6 +61,7 @@ import org.thingsboard.server.service.telemetry.InternalTelemetryService; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
@ -73,6 +74,7 @@ import java.util.concurrent.Executors; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.locks.Lock; |
|
|
|
import java.util.concurrent.locks.ReentrantLock; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
@Service |
|
|
|
@ -347,16 +349,11 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { |
|
|
|
try { |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
myTenantStates.values().forEach(state -> { |
|
|
|
if ((state.getNextCycleTs() > now) && (state.getNextCycleTs() - now < TimeUnit.HOURS.toMillis(1))) { |
|
|
|
if ((state.getNextCycleTs() < now) && (now - state.getNextCycleTs() < TimeUnit.HOURS.toMillis(1))) { |
|
|
|
TenantId tenantId = state.getTenantId(); |
|
|
|
state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth()); |
|
|
|
ToUsageStatsServiceMsg.Builder msg = ToUsageStatsServiceMsg.newBuilder(); |
|
|
|
msg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); |
|
|
|
msg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); |
|
|
|
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { |
|
|
|
msg.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(0).build()); |
|
|
|
} |
|
|
|
process(new TbProtoQueueMsg<>(UUID.randomUUID(), msg.build()), TbCallback.EMPTY); |
|
|
|
saveNewCounts(state, Arrays.asList(ApiUsageRecordKey.values())); |
|
|
|
updateTenantState(state, tenantProfileCache.get(tenantId)); |
|
|
|
} |
|
|
|
}); |
|
|
|
} finally { |
|
|
|
@ -364,6 +361,14 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void saveNewCounts(TenantApiUsageState state, List<ApiUsageRecordKey> keys) { |
|
|
|
List<TsKvEntry> counts = keys.stream() |
|
|
|
.map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L))) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
|
|
|
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK); |
|
|
|
} |
|
|
|
|
|
|
|
private TenantApiUsageState getOrFetchState(TenantId tenantId) { |
|
|
|
TenantApiUsageState tenantState = myTenantStates.get(tenantId); |
|
|
|
if (tenantState == null) { |
|
|
|
@ -377,6 +382,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { |
|
|
|
} |
|
|
|
TenantProfile tenantProfile = tenantProfileCache.get(tenantId); |
|
|
|
tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity); |
|
|
|
List<ApiUsageRecordKey> newCounts = new ArrayList<>(); |
|
|
|
try { |
|
|
|
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getId()).get(); |
|
|
|
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { |
|
|
|
@ -385,7 +391,13 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { |
|
|
|
for (TsKvEntry tsKvEntry : dbValues) { |
|
|
|
if (tsKvEntry.getKey().equals(key.getApiCountKey())) { |
|
|
|
cycleEntryFound = true; |
|
|
|
tenantState.put(key, tsKvEntry.getTs() == tenantState.getCurrentCycleTs() ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
|
|
|
|
boolean oldCount = tsKvEntry.getTs() == tenantState.getCurrentCycleTs(); |
|
|
|
tenantState.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
|
|
|
|
if (!oldCount) { |
|
|
|
newCounts.add(key); |
|
|
|
} |
|
|
|
} else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { |
|
|
|
hourlyEntryFound = true; |
|
|
|
tenantState.putHourly(key, tsKvEntry.getTs() == tenantState.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
@ -397,6 +409,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { |
|
|
|
} |
|
|
|
log.debug("[{}] Initialized state: {}", tenantId, dbStateEntity); |
|
|
|
myTenantStates.put(tenantId, tenantState); |
|
|
|
saveNewCounts(tenantState, newCounts); |
|
|
|
} catch (InterruptedException | ExecutionException e) { |
|
|
|
log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); |
|
|
|
} |
|
|
|
|