Browse Source

Improvements to ApiUsageState persistence

pull/3615/head
Andrii Shvaika 6 years ago
parent
commit
9ec4b77672
  1. 119
      application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java
  2. 12
      application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java
  3. 49
      application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java
  4. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java
  5. 2
      common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java
  6. 48
      common/message/src/main/java/org/thingsboard/server/common/msg/tools/SchedulerUtils.java
  7. 4
      dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java

119
application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java

@ -16,9 +16,11 @@
package org.thingsboard.server.service.apiusage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
@ -30,69 +32,144 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import java.time.LocalDate;
import java.time.ZoneId;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@TbCoreComponent
@Service
public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public static final String HOURLY = "HOURLY_";
private final ApiUsageStateService apiUsageStateService;
private final TimeseriesService tsService;
private final ZoneId zoneId;
private final SchedulerComponent scheduler;
private final TbTenantProfileCache tenantProfileCache;
private final Map<TenantId, TenantApiUsageState> tenantStates = new ConcurrentHashMap<>();
public DefaultTbApiUsageStateService(ApiUsageStateService apiUsageStateService, TimeseriesService tsService) {
@Value("${usage.stats.report.enabled:true}")
private boolean enabled;
@Value("${usage.stats.check.cycle:60000}")
private long nextCycleCheckInterval;
private final Lock updateLock = new ReentrantLock();
public DefaultTbApiUsageStateService(ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) {
this.apiUsageStateService = apiUsageStateService;
this.tsService = tsService;
this.zoneId = SchedulerUtils.getZoneId("UTC");
this.scheduler = scheduler;
this.tenantProfileCache = tenantProfileCache;
}
@PostConstruct
public void init() {
if (enabled) {
scheduler.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS);
}
}
@Override
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
ToUsageStatsServiceMsg statsMsg = msg.getValue();
TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
TenantApiUsageState tenantState = getOrFetchState(tenantId);
long ts = tenantState.getCurrentMonthTs();
List<TsKvEntry> updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) {
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
long newValue = tenantState.add(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue)));
TenantApiUsageState tenantState;
List<TsKvEntry> updatedEntries;
updateLock.lock();
try {
tenantState = getOrFetchState(tenantId);
long ts = tenantState.getCurrentCycleTs();
long hourTs = tenantState.getCurrentHourTs();
long newHourTs = SchedulerUtils.getStartOfCurrentHour();
if (newHourTs != hourTs) {
tenantState.setHour(newHourTs);
}
updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length);
for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) {
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
long newValue = tenantState.add(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue)));
newValue = tenantState.addToHourly(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newValue)));
}
} finally {
updateLock.unlock();
}
tsService.save(tenantId, tenantState.getEntityId(), updatedEntries, 0L);
}
@Override
public TenantApiUsageState getApiUsageState(TenantId tenantId) {
return null;
}
@Override
public void onAddedToAllowList(TenantId tenantId) {
}
@Override
public void onAddedToDenyList(TenantId tenantId) {
}
private void checkStartOfNextCycle() {
updateLock.lock();
try {
long now = System.currentTimeMillis();
tenantStates.values().forEach(state -> {
if ((state.getNextCycleTs() > now) && (state.getNextCycleTs() - now < TimeUnit.HOURS.toMillis(1))) {
state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth());
}
});
} finally {
updateLock.unlock();
}
}
private TenantApiUsageState getOrFetchState(TenantId tenantId) {
TenantApiUsageState tenantState = tenantStates.get(tenantId);
if (tenantState == null) {
long currentMonthTs = LocalDate.now().withDayOfMonth(1).atStartOfDay(zoneId).toInstant().toEpochMilli();
ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
tenantState = new TenantApiUsageState(currentMonthTs, dbStateEntity.getEntityId());
if (dbStateEntity == null) {
try {
dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId);
} catch (Exception e) {
dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
}
}
tenantState = new TenantApiUsageState(dbStateEntity.getEntityId());
try {
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get();
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
TsKvEntry keyEntry = null;
boolean cycleEntryFound = false;
boolean hourlyEntryFound = false;
for (TsKvEntry tsKvEntry : dbValues) {
if (tsKvEntry.getKey().equals(key.name())) {
keyEntry = tsKvEntry;
cycleEntryFound = true;
tenantState.put(key, tsKvEntry.getLongValue().get());
} else if (tsKvEntry.getKey().equals(HOURLY + key.name())) {
hourlyEntryFound = true;
if (tsKvEntry.getTs() == tenantState.getCurrentHourTs()) {
tenantState.putHourly(key, tsKvEntry.getLongValue().get());
}
}
if (cycleEntryFound && hourlyEntryFound) {
break;
}
}
if (keyEntry != null) {
tenantState.put(key, keyEntry.getLongValue().get());
} else {
tenantState.put(key, 0L);
}
}
tenantStates.put(tenantId, tenantState);
} catch (InterruptedException | ExecutionException e) {

12
application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java

@ -15,11 +15,19 @@
*/
package org.thingsboard.server.service.apiusage;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface TbApiUsageStateService {
void process(TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg> msg, TbCallback callback);
void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback);
TenantApiUsageState getApiUsageState(TenantId tenantId);
void onAddedToAllowList(TenantId tenantId);
void onAddedToDenyList(TenantId tenantId);
}

49
application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java

@ -18,30 +18,65 @@ package org.thingsboard.server.service.apiusage;
import lombok.Getter;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TenantApiUsageState {
private final Map<ApiUsageRecordKey, Long> values = new ConcurrentHashMap<>();
private final Map<ApiUsageRecordKey, Long> currentCycleValues = new ConcurrentHashMap<>();
private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>();
@Getter
private final EntityId entityId;
@Getter
private volatile long currentMonthTs;
private volatile long currentCycleTs;
@Getter
private volatile long nextCycleTs;
@Getter
private volatile long currentHourTs;
public TenantApiUsageState(long currentMonthTs, EntityId entityId) {
public TenantApiUsageState(EntityId entityId) {
this.entityId = entityId;
this.currentMonthTs = currentMonthTs;
this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
this.nextCycleTs = SchedulerUtils.getStartOfNextMonth();
this.currentHourTs = SchedulerUtils.getStartOfCurrentHour();
}
public void put(ApiUsageRecordKey key, Long value) {
values.put(key, value);
currentCycleValues.put(key, value);
}
public void putHourly(ApiUsageRecordKey key, Long value) {
currentHourValues.put(key, value);
}
public long add(ApiUsageRecordKey key, long value) {
long result = values.getOrDefault(key, 0L) + value;
values.put(key, result);
long result = currentCycleValues.getOrDefault(key, 0L) + value;
currentCycleValues.put(key, result);
return result;
}
public long addToHourly(ApiUsageRecordKey key, long value) {
long result = currentHourValues.getOrDefault(key, 0L) + value;
currentHourValues.put(key, result);
return result;
}
public void setHour(long currentHourTs) {
this.currentHourTs = currentHourTs;
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
currentHourValues.put(key, 0L);
}
}
public void setCycles(long currentCycleTs, long nextCycleTs) {
this.currentCycleTs = currentCycleTs;
this.nextCycleTs = nextCycleTs;
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
currentCycleValues.put(key, 0L);
}
}
}

2
common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java

@ -24,5 +24,5 @@ public interface ApiUsageStateService {
void deleteApiUsageStateByTenantId(TenantId tenantId);
void createDefaultApiUsageState(TenantId id);
ApiUsageState createDefaultApiUsageState(TenantId id);
}

2
common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java

@ -21,5 +21,5 @@ import java.io.Serializable;
* @author Andrew Shvayka
*/
public enum ComponentLifecycleEvent implements Serializable {
CREATED, STARTED, ACTIVATED, SUSPENDED, UPDATED, STOPPED, DELETED
CREATED, STARTED, ACTIVATED, SUSPENDED, UPDATED, STOPPED, DELETED, ADDED_TO_ALLOW_LIST, ADDED_TO_DENY_LIST
}

48
common/message/src/main/java/org/thingsboard/server/common/msg/tools/SchedulerUtils.java

@ -15,16 +15,64 @@
*/
package org.thingsboard.server.common.msg.tools;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAdjuster;
import java.time.temporal.TemporalAdjusters;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoUnit.MONTHS;
public class SchedulerUtils {
private final static ZoneId UTC = ZoneId.of("UTC");
private static final ConcurrentMap<String, ZoneId> tzMap = new ConcurrentHashMap<>();
public static ZoneId getZoneId(String tz) {
return tzMap.computeIfAbsent(tz == null || tz.isEmpty() ? "UTC" : tz, ZoneId::of);
}
public static long getStartOfCurrentHour() {
return getStartOfCurrentHour(UTC);
}
public static long getStartOfCurrentHour(ZoneId zoneId) {
return LocalDateTime.now(ZoneOffset.UTC).atZone(zoneId).truncatedTo(ChronoUnit.HOURS).toInstant().toEpochMilli();
}
public static long getStartOfCurrentMonth() {
return getStartOfCurrentMonth(UTC);
}
public static long getStartOfCurrentMonth(ZoneId zoneId) {
return LocalDate.now().withDayOfMonth(1).atStartOfDay(zoneId).toInstant().toEpochMilli();
}
public static long getStartOfNextMonth() {
return getStartOfNextMonth(UTC);
}
public static long getStartOfNextMonth(ZoneId zoneId) {
return LocalDate.now().with(TemporalAdjusters.firstDayOfNextMonth()).atStartOfDay(zoneId).toInstant().toEpochMilli();
}
public static long getStartOfNextNextMonth() {
return getStartOfNextNextMonth(UTC);
}
public static long getStartOfNextNextMonth(ZoneId zoneId) {
return LocalDate.now().with(firstDayOfNextNextMonth()).atStartOfDay(zoneId).toInstant().toEpochMilli();
}
public static TemporalAdjuster firstDayOfNextNextMonth() {
return (temporal) -> temporal.with(DAY_OF_MONTH, 1).plus(2, MONTHS);
}
}

4
dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java

@ -49,14 +49,14 @@ public class ApiApiUsageStateServiceImpl extends AbstractEntityService implement
}
@Override
public void createDefaultApiUsageState(TenantId tenantId) {
public ApiUsageState createDefaultApiUsageState(TenantId tenantId) {
log.trace("Executing createDefaultUsageRecord [{}]", tenantId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
ApiUsageState apiUsageState = new ApiUsageState();
apiUsageState.setTenantId(tenantId);
apiUsageState.setEntityId(tenantId);
apiUsageStateValidator.validate(apiUsageState, ApiUsageState::getTenantId);
apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
}
@Override

Loading…
Cancel
Save