diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 8a50c9d62b..1b6cdf3cff 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -16,9 +16,11 @@ package org.thingsboard.server.service.apiusage; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -30,69 +32,144 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.profile.TbTenantProfileCache; -import java.time.LocalDate; -import java.time.ZoneId; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @TbCoreComponent @Service public class DefaultTbApiUsageStateService implements TbApiUsageStateService { + public static final String HOURLY = "HOURLY_"; private final ApiUsageStateService apiUsageStateService; private final TimeseriesService tsService; - private final ZoneId zoneId; + private final SchedulerComponent scheduler; + private final TbTenantProfileCache tenantProfileCache; private final Map tenantStates = new ConcurrentHashMap<>(); - public DefaultTbApiUsageStateService(ApiUsageStateService apiUsageStateService, TimeseriesService tsService) { + @Value("${usage.stats.report.enabled:true}") + private boolean enabled; + + @Value("${usage.stats.check.cycle:60000}") + private long nextCycleCheckInterval; + + private final Lock updateLock = new ReentrantLock(); + + public DefaultTbApiUsageStateService(ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) { this.apiUsageStateService = apiUsageStateService; this.tsService = tsService; - this.zoneId = SchedulerUtils.getZoneId("UTC"); + this.scheduler = scheduler; + this.tenantProfileCache = tenantProfileCache; + } + + @PostConstruct + public void init() { + if (enabled) { + scheduler.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS); + } } @Override public void process(TbProtoQueueMsg msg, TbCallback callback) { ToUsageStatsServiceMsg statsMsg = msg.getValue(); TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); - TenantApiUsageState tenantState = getOrFetchState(tenantId); - long ts = tenantState.getCurrentMonthTs(); - List updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); - for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) { - ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); - long newValue = tenantState.add(recordKey, kvProto.getValue()); - updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue))); + TenantApiUsageState tenantState; + List updatedEntries; + updateLock.lock(); + try { + tenantState = getOrFetchState(tenantId); + long ts = tenantState.getCurrentCycleTs(); + long hourTs = tenantState.getCurrentHourTs(); + long newHourTs = SchedulerUtils.getStartOfCurrentHour(); + if (newHourTs != hourTs) { + tenantState.setHour(newHourTs); + } + updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); + for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) { + ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); + long newValue = tenantState.add(recordKey, kvProto.getValue()); + updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue))); + newValue = tenantState.addToHourly(recordKey, kvProto.getValue()); + updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newValue))); + } + } finally { + updateLock.unlock(); } tsService.save(tenantId, tenantState.getEntityId(), updatedEntries, 0L); } + @Override + public TenantApiUsageState getApiUsageState(TenantId tenantId) { + return null; + } + + @Override + public void onAddedToAllowList(TenantId tenantId) { + + } + + @Override + public void onAddedToDenyList(TenantId tenantId) { + + } + + private void checkStartOfNextCycle() { + updateLock.lock(); + try { + long now = System.currentTimeMillis(); + tenantStates.values().forEach(state -> { + if ((state.getNextCycleTs() > now) && (state.getNextCycleTs() - now < TimeUnit.HOURS.toMillis(1))) { + state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth()); + } + }); + } finally { + updateLock.unlock(); + } + } + private TenantApiUsageState getOrFetchState(TenantId tenantId) { TenantApiUsageState tenantState = tenantStates.get(tenantId); if (tenantState == null) { - long currentMonthTs = LocalDate.now().withDayOfMonth(1).atStartOfDay(zoneId).toInstant().toEpochMilli(); ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); - tenantState = new TenantApiUsageState(currentMonthTs, dbStateEntity.getEntityId()); + if (dbStateEntity == null) { + try { + dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId); + } catch (Exception e) { + dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); + } + } + tenantState = new TenantApiUsageState(dbStateEntity.getEntityId()); try { List dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get(); for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - TsKvEntry keyEntry = null; + boolean cycleEntryFound = false; + boolean hourlyEntryFound = false; for (TsKvEntry tsKvEntry : dbValues) { if (tsKvEntry.getKey().equals(key.name())) { - keyEntry = tsKvEntry; + cycleEntryFound = true; + tenantState.put(key, tsKvEntry.getLongValue().get()); + } else if (tsKvEntry.getKey().equals(HOURLY + key.name())) { + hourlyEntryFound = true; + if (tsKvEntry.getTs() == tenantState.getCurrentHourTs()) { + tenantState.putHourly(key, tsKvEntry.getLongValue().get()); + } + } + if (cycleEntryFound && hourlyEntryFound) { break; } } - if (keyEntry != null) { - tenantState.put(key, keyEntry.getLongValue().get()); - } else { - tenantState.put(key, 0L); - } } tenantStates.put(tenantId, tenantState); } catch (InterruptedException | ExecutionException e) { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index b5aeedeb06..e79b79d069 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -15,11 +15,19 @@ */ package org.thingsboard.server.service.apiusage; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; public interface TbApiUsageStateService { - void process(TbProtoQueueMsg msg, TbCallback callback); + void process(TbProtoQueueMsg msg, TbCallback callback); + + TenantApiUsageState getApiUsageState(TenantId tenantId); + + void onAddedToAllowList(TenantId tenantId); + + void onAddedToDenyList(TenantId tenantId); + } diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java index a8be19914b..29335c3326 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java @@ -18,30 +18,65 @@ package org.thingsboard.server.service.apiusage; import lombok.Getter; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.msg.tools.SchedulerUtils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class TenantApiUsageState { - private final Map values = new ConcurrentHashMap<>(); + private final Map currentCycleValues = new ConcurrentHashMap<>(); + private final Map currentHourValues = new ConcurrentHashMap<>(); + @Getter private final EntityId entityId; @Getter - private volatile long currentMonthTs; + private volatile long currentCycleTs; + @Getter + private volatile long nextCycleTs; + @Getter + private volatile long currentHourTs; - public TenantApiUsageState(long currentMonthTs, EntityId entityId) { + public TenantApiUsageState(EntityId entityId) { this.entityId = entityId; - this.currentMonthTs = currentMonthTs; + this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); + this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); + this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); } public void put(ApiUsageRecordKey key, Long value) { - values.put(key, value); + currentCycleValues.put(key, value); + } + + public void putHourly(ApiUsageRecordKey key, Long value) { + currentHourValues.put(key, value); } public long add(ApiUsageRecordKey key, long value) { - long result = values.getOrDefault(key, 0L) + value; - values.put(key, result); + long result = currentCycleValues.getOrDefault(key, 0L) + value; + currentCycleValues.put(key, result); return result; } + + public long addToHourly(ApiUsageRecordKey key, long value) { + long result = currentHourValues.getOrDefault(key, 0L) + value; + currentHourValues.put(key, result); + return result; + } + + public void setHour(long currentHourTs) { + this.currentHourTs = currentHourTs; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + currentHourValues.put(key, 0L); + } + } + + public void setCycles(long currentCycleTs, long nextCycleTs) { + this.currentCycleTs = currentCycleTs; + this.nextCycleTs = nextCycleTs; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + currentCycleValues.put(key, 0L); + } + } + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java index ad6c73f63e..c39e2a69dc 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java @@ -24,5 +24,5 @@ public interface ApiUsageStateService { void deleteApiUsageStateByTenantId(TenantId tenantId); - void createDefaultApiUsageState(TenantId id); + ApiUsageState createDefaultApiUsageState(TenantId id); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java index 87adc31713..1f242a0f88 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java @@ -21,5 +21,5 @@ import java.io.Serializable; * @author Andrew Shvayka */ public enum ComponentLifecycleEvent implements Serializable { - CREATED, STARTED, ACTIVATED, SUSPENDED, UPDATED, STOPPED, DELETED + CREATED, STARTED, ACTIVATED, SUSPENDED, UPDATED, STOPPED, DELETED, ADDED_TO_ALLOW_LIST, ADDED_TO_DENY_LIST } \ No newline at end of file diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/SchedulerUtils.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/SchedulerUtils.java index fea6fcb337..421066b30b 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/SchedulerUtils.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/SchedulerUtils.java @@ -15,16 +15,64 @@ */ package org.thingsboard.server.common.msg.tools; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAdjuster; +import java.time.temporal.TemporalAdjusters; +import java.time.temporal.TemporalUnit; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static java.time.temporal.ChronoField.DAY_OF_MONTH; +import static java.time.temporal.ChronoUnit.MONTHS; + public class SchedulerUtils { + private final static ZoneId UTC = ZoneId.of("UTC"); private static final ConcurrentMap tzMap = new ConcurrentHashMap<>(); public static ZoneId getZoneId(String tz) { return tzMap.computeIfAbsent(tz == null || tz.isEmpty() ? "UTC" : tz, ZoneId::of); } + public static long getStartOfCurrentHour() { + return getStartOfCurrentHour(UTC); + } + + public static long getStartOfCurrentHour(ZoneId zoneId) { + return LocalDateTime.now(ZoneOffset.UTC).atZone(zoneId).truncatedTo(ChronoUnit.HOURS).toInstant().toEpochMilli(); + } + + public static long getStartOfCurrentMonth() { + return getStartOfCurrentMonth(UTC); + } + + public static long getStartOfCurrentMonth(ZoneId zoneId) { + return LocalDate.now().withDayOfMonth(1).atStartOfDay(zoneId).toInstant().toEpochMilli(); + } + + public static long getStartOfNextMonth() { + return getStartOfNextMonth(UTC); + } + + public static long getStartOfNextMonth(ZoneId zoneId) { + return LocalDate.now().with(TemporalAdjusters.firstDayOfNextMonth()).atStartOfDay(zoneId).toInstant().toEpochMilli(); + } + + public static long getStartOfNextNextMonth() { + return getStartOfNextNextMonth(UTC); + } + + public static long getStartOfNextNextMonth(ZoneId zoneId) { + return LocalDate.now().with(firstDayOfNextNextMonth()).atStartOfDay(zoneId).toInstant().toEpochMilli(); + } + + public static TemporalAdjuster firstDayOfNextNextMonth() { + return (temporal) -> temporal.with(DAY_OF_MONTH, 1).plus(2, MONTHS); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java index 9e543a8f7a..6be56a1815 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java @@ -49,14 +49,14 @@ public class ApiApiUsageStateServiceImpl extends AbstractEntityService implement } @Override - public void createDefaultApiUsageState(TenantId tenantId) { + public ApiUsageState createDefaultApiUsageState(TenantId tenantId) { log.trace("Executing createDefaultUsageRecord [{}]", tenantId); validateId(tenantId, INCORRECT_TENANT_ID + tenantId); ApiUsageState apiUsageState = new ApiUsageState(); apiUsageState.setTenantId(tenantId); apiUsageState.setEntityId(tenantId); apiUsageStateValidator.validate(apiUsageState, ApiUsageState::getTenantId); - apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState); + return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState); } @Override