|
|
|
@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; |
|
|
|
import org.thingsboard.server.common.msg.queue.ServiceType; |
|
|
|
import org.thingsboard.server.common.msg.queue.TbCallback; |
|
|
|
import org.thingsboard.server.common.msg.tools.SchedulerUtils; |
|
|
|
import org.thingsboard.server.dao.customer.CustomerService; |
|
|
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|
|
|
import org.thingsboard.server.dao.tenant.TenantService; |
|
|
|
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|
|
|
@ -66,10 +67,10 @@ import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
@ -99,6 +100,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
private final TbClusterService clusterService; |
|
|
|
private final PartitionService partitionService; |
|
|
|
private final TenantService tenantService; |
|
|
|
private final CustomerService customerService; |
|
|
|
private final TimeseriesService tsService; |
|
|
|
private final ApiUsageStateService apiUsageStateService; |
|
|
|
private final SchedulerComponent scheduler; |
|
|
|
@ -127,13 +129,16 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
public DefaultTbApiUsageStateService(TbClusterService clusterService, |
|
|
|
PartitionService partitionService, |
|
|
|
TenantService tenantService, |
|
|
|
CustomerService customerService, |
|
|
|
TimeseriesService tsService, |
|
|
|
ApiUsageStateService apiUsageStateService, |
|
|
|
SchedulerComponent scheduler, |
|
|
|
TbTenantProfileCache tenantProfileCache, MailService mailService) { |
|
|
|
TbTenantProfileCache tenantProfileCache, |
|
|
|
MailService mailService) { |
|
|
|
this.clusterService = clusterService; |
|
|
|
this.partitionService = partitionService; |
|
|
|
this.tenantService = tenantService; |
|
|
|
this.customerService = customerService; |
|
|
|
this.tsService = tsService; |
|
|
|
this.apiUsageStateService = apiUsageStateService; |
|
|
|
this.scheduler = scheduler; |
|
|
|
@ -154,21 +159,20 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
@Override |
|
|
|
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) { |
|
|
|
ToUsageStatsServiceMsg statsMsg = msg.getValue(); |
|
|
|
|
|
|
|
TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); |
|
|
|
CustomerId customerId; |
|
|
|
EntityId initiatorId; |
|
|
|
if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { |
|
|
|
customerId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); |
|
|
|
initiatorId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); |
|
|
|
} else { |
|
|
|
customerId = new CustomerId(EntityId.NULL_UUID); |
|
|
|
initiatorId = tenantId; |
|
|
|
} |
|
|
|
|
|
|
|
processEntityUsageStats(tenantId, customerId.isNullUid() ? tenantId : customerId, statsMsg.getValuesList()); |
|
|
|
processEntityUsageStats(tenantId, initiatorId, statsMsg.getValuesList()); |
|
|
|
callback.onSuccess(); |
|
|
|
} |
|
|
|
|
|
|
|
private void processEntityUsageStats(TenantId tenantId, EntityId entityId, List<UsageStatsKVProto> values) { |
|
|
|
if (tenantProfileCache.get(tenantId) == null) return; |
|
|
|
|
|
|
|
BaseApiUsageState usageState; |
|
|
|
List<TsKvEntry> updatedEntries; |
|
|
|
Map<ApiFeature, ApiUsageStateValue> result; |
|
|
|
@ -192,7 +196,11 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue))); |
|
|
|
apiFeatures.add(recordKey.getApiFeature()); |
|
|
|
} |
|
|
|
result = usageState.checkStateUpdatedDueToThreshold(apiFeatures); |
|
|
|
if (usageState.getEntityType() == EntityType.TENANT && !usageState.getEntityId().equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
result = ((TenantApiUsageState) usageState).checkStateUpdatedDueToThreshold(apiFeatures); |
|
|
|
} else { |
|
|
|
result = Collections.emptyMap(); |
|
|
|
} |
|
|
|
} finally { |
|
|
|
updateLock.unlock(); |
|
|
|
} |
|
|
|
@ -226,7 +234,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
return state; |
|
|
|
} else { |
|
|
|
if (partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { |
|
|
|
return getOrFetchState(tenantId).getApiUsageState(); |
|
|
|
return getOrFetchState(tenantId, tenantId).getApiUsageState(); |
|
|
|
} else { |
|
|
|
updateLock.lock(); |
|
|
|
try { |
|
|
|
@ -319,10 +327,10 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
clusterService.onApiStateChange(state.getApiUsageState(), null); |
|
|
|
long ts = System.currentTimeMillis(); |
|
|
|
List<TsKvEntry> stateTelemetry = new ArrayList<>(); |
|
|
|
result.forEach(((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name()))))); |
|
|
|
result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name())))); |
|
|
|
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK); |
|
|
|
|
|
|
|
if (state.getEntityType() == EntityType.TENANT) { |
|
|
|
if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
String email = tenantService.findTenantById(state.getTenantId()).getEmail(); |
|
|
|
if (StringUtils.isNotEmpty(email)) { |
|
|
|
result.forEach((apiFeature, stateValue) -> { |
|
|
|
@ -373,11 +381,12 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
myUsageStates.values().forEach(state -> { |
|
|
|
if ((state.getNextCycleTs() < now) && (now - state.getNextCycleTs() < TimeUnit.HOURS.toMillis(1))) { |
|
|
|
// FIXME
|
|
|
|
TenantId tenantId = state.getTenantId(); |
|
|
|
state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth()); |
|
|
|
saveNewCounts(state, Arrays.asList(ApiUsageRecordKey.values())); |
|
|
|
updateTenantState((TenantApiUsageState) state, tenantProfileCache.get(tenantId)); |
|
|
|
if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
TenantId tenantId = state.getTenantId(); |
|
|
|
updateTenantState((TenantApiUsageState) state, tenantProfileCache.get(tenantId)); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
} finally { |
|
|
|
@ -394,29 +403,30 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
} |
|
|
|
|
|
|
|
private BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) { |
|
|
|
if (entityId == null || entityId.isNullUid()) { |
|
|
|
entityId = tenantId; |
|
|
|
} |
|
|
|
BaseApiUsageState state = myUsageStates.get(entityId); |
|
|
|
if (state != null) { |
|
|
|
return state; |
|
|
|
} |
|
|
|
|
|
|
|
ApiUsageState storedState = Optional.ofNullable(apiUsageStateService.findApiUsageStateByEntityId(entityId)) |
|
|
|
.orElseGet(() -> { |
|
|
|
try { |
|
|
|
return apiUsageStateService.createDefaultApiUsageState(tenantId, entityId); |
|
|
|
} catch (Exception e) { |
|
|
|
return apiUsageStateService.findApiUsageStateByEntityId(entityId); |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
switch (entityId.getEntityType()) { |
|
|
|
case TENANT: |
|
|
|
TenantProfile tenantProfile = tenantProfileCache.get(tenantId); |
|
|
|
state = new TenantApiUsageState(tenantProfile, storedState); |
|
|
|
break; |
|
|
|
case CUSTOMER: |
|
|
|
default: |
|
|
|
state = new CustomerApiUsageState(storedState); |
|
|
|
break; |
|
|
|
ApiUsageState storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId); |
|
|
|
if (storedState == null) { |
|
|
|
try { |
|
|
|
storedState = apiUsageStateService.createDefaultApiUsageState(tenantId, entityId); |
|
|
|
} catch (Exception e) { |
|
|
|
storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId); |
|
|
|
} |
|
|
|
} |
|
|
|
if (entityId.getEntityType() == EntityType.TENANT) { |
|
|
|
if (!entityId.equals(TenantId.SYS_TENANT_ID)) { |
|
|
|
state = new TenantApiUsageState(tenantProfileCache.get((TenantId) entityId), storedState); |
|
|
|
} else { |
|
|
|
state = new TenantApiUsageState(storedState); |
|
|
|
} |
|
|
|
} else { |
|
|
|
state = new CustomerApiUsageState(storedState); |
|
|
|
} |
|
|
|
|
|
|
|
List<ApiUsageRecordKey> newCounts = new ArrayList<>(); |
|
|
|
@ -454,54 +464,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
return state; |
|
|
|
} |
|
|
|
|
|
|
|
private TenantApiUsageState getOrFetchState(TenantId tenantId) { |
|
|
|
TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId); |
|
|
|
if (tenantState == null) { |
|
|
|
ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); |
|
|
|
if (dbStateEntity == null) { |
|
|
|
try { |
|
|
|
dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId, null); |
|
|
|
} catch (Exception e) { |
|
|
|
dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); |
|
|
|
} |
|
|
|
} |
|
|
|
TenantProfile tenantProfile = tenantProfileCache.get(tenantId); |
|
|
|
tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity); |
|
|
|
List<ApiUsageRecordKey> newCounts = new ArrayList<>(); |
|
|
|
try { |
|
|
|
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getId()).get(); |
|
|
|
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { |
|
|
|
boolean cycleEntryFound = false; |
|
|
|
boolean hourlyEntryFound = false; |
|
|
|
for (TsKvEntry tsKvEntry : dbValues) { |
|
|
|
if (tsKvEntry.getKey().equals(key.getApiCountKey())) { |
|
|
|
cycleEntryFound = true; |
|
|
|
|
|
|
|
boolean oldCount = tsKvEntry.getTs() == tenantState.getCurrentCycleTs(); |
|
|
|
tenantState.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
|
|
|
|
if (!oldCount) { |
|
|
|
newCounts.add(key); |
|
|
|
} |
|
|
|
} else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { |
|
|
|
hourlyEntryFound = true; |
|
|
|
tenantState.putHourly(key, tsKvEntry.getTs() == tenantState.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); |
|
|
|
} |
|
|
|
if (cycleEntryFound && hourlyEntryFound) { |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
log.debug("[{}] Initialized state: {}", tenantId, dbStateEntity); |
|
|
|
myUsageStates.put(tenantId, tenantState); |
|
|
|
saveNewCounts(tenantState, newCounts); |
|
|
|
} catch (InterruptedException | ExecutionException e) { |
|
|
|
log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); |
|
|
|
} |
|
|
|
} |
|
|
|
return tenantState; |
|
|
|
} |
|
|
|
|
|
|
|
private void initStatesFromDataBase() { |
|
|
|
try { |
|
|
|
log.info("Initializing tenant states."); |
|
|
|
@ -516,7 +478,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa |
|
|
|
log.debug("[{}] Initializing tenant state.", tenant.getId()); |
|
|
|
futures.add(tmpInitExecutor.submit(() -> { |
|
|
|
try { |
|
|
|
updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); |
|
|
|
updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); |
|
|
|
log.debug("[{}] Initialized tenant state.", tenant.getId()); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); |
|
|
|
|