diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ae7d8c492e..1ddb99d7b7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; @@ -269,7 +270,12 @@ class DefaultTbContext implements TbContext { @Override public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return TbMsg.newMsg(queueName, type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + return newMsg(queueName, type, originator, null, metaData, data); + } + + @Override + public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 573310f7cd..9fd33a91d7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -106,7 +106,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse)); + deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index daca2d1a5d..d311cdbfed 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -449,7 +450,7 @@ public class TelemetryController extends BaseController { TenantProfile tenantProfile = tenantProfileCache.get(tenantId); tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); } - tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback() { + tsSubService.saveAndNotify(tenantId, user.getCustomerId(), entityId, entries, tenantTtl, new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { logTelemetryUpdated(user, entityId, entries, null); diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java new file mode 100644 index 0000000000..f22fd1c6e7 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java @@ -0,0 +1,148 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.apiusage; + +import lombok.Getter; +import org.springframework.data.util.Pair; +import org.thingsboard.server.common.data.ApiFeature; +import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.tools.SchedulerUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public abstract class BaseApiUsageState { + private final Map currentCycleValues = new ConcurrentHashMap<>(); + private final Map currentHourValues = new ConcurrentHashMap<>(); + + @Getter + private final ApiUsageState apiUsageState; + @Getter + private volatile long currentCycleTs; + @Getter + private volatile long nextCycleTs; + @Getter + private volatile long currentHourTs; + + public BaseApiUsageState(ApiUsageState apiUsageState) { + this.apiUsageState = apiUsageState; + this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); + this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); + this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); + } + + public void put(ApiUsageRecordKey key, Long value) { + currentCycleValues.put(key, value); + } + + public void putHourly(ApiUsageRecordKey key, Long value) { + currentHourValues.put(key, value); + } + + public long add(ApiUsageRecordKey key, long value) { + long result = currentCycleValues.getOrDefault(key, 0L) + value; + currentCycleValues.put(key, result); + return result; + } + + public long get(ApiUsageRecordKey key) { + return currentCycleValues.getOrDefault(key, 0L); + } + + public long addToHourly(ApiUsageRecordKey key, long value) { + long result = currentHourValues.getOrDefault(key, 0L) + value; + currentHourValues.put(key, result); + return result; + } + + public void setHour(long currentHourTs) { + this.currentHourTs = currentHourTs; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + currentHourValues.put(key, 0L); + } + } + + public void setCycles(long currentCycleTs, long nextCycleTs) { + this.currentCycleTs = currentCycleTs; + this.nextCycleTs = nextCycleTs; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + currentCycleValues.put(key, 0L); + } + } + + public ApiUsageStateValue getFeatureValue(ApiFeature feature) { + switch (feature) { + case TRANSPORT: + return apiUsageState.getTransportState(); + case RE: + return apiUsageState.getReExecState(); + case DB: + return apiUsageState.getDbStorageState(); + case JS: + return apiUsageState.getJsExecState(); + case EMAIL: + return apiUsageState.getEmailExecState(); + case SMS: + return apiUsageState.getSmsExecState(); + default: + return ApiUsageStateValue.ENABLED; + } + } + + public boolean setFeatureValue(ApiFeature feature, ApiUsageStateValue value) { + ApiUsageStateValue currentValue = getFeatureValue(feature); + switch (feature) { + case TRANSPORT: + apiUsageState.setTransportState(value); + break; + case RE: + apiUsageState.setReExecState(value); + break; + case DB: + apiUsageState.setDbStorageState(value); + break; + case JS: + apiUsageState.setJsExecState(value); + break; + case EMAIL: + apiUsageState.setEmailExecState(value); + break; + case SMS: + apiUsageState.setSmsExecState(value); + break; + } + return !currentValue.equals(value); + } + + public abstract EntityType getEntityType(); + + public TenantId getTenantId() { + return getApiUsageState().getTenantId(); + } + + public EntityId getEntityId() { + return getApiUsageState().getEntityId(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/CustomerApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/CustomerApiUsageState.java new file mode 100644 index 0000000000..afc2c47bdb --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/CustomerApiUsageState.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.apiusage; + +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.EntityType; + +public class CustomerApiUsageState extends BaseApiUsageState { + public CustomerApiUsageState(ApiUsageState apiUsageState) { + super(apiUsageState); + } + + @Override + public EntityType getEntityType() { + return EntityType.CUSTOMER; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index c2549803e5..9c7915d4c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -29,10 +29,13 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateMailMessage; import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -45,6 +48,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.tools.SchedulerUtils; +import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -63,6 +67,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -95,6 +100,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener myTenantStates = new ConcurrentHashMap<>(); - // Tenants that should be processed on other servers - private final Map otherTenantStates = new ConcurrentHashMap<>(); + // Entities that should be processed on this server + private final Map myUsageStates = new ConcurrentHashMap<>(); + // Entities that should be processed on other servers + private final Map otherUsageStates = new ConcurrentHashMap<>(); + + private final Set deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>()); @Value("${usage.stats.report.enabled:true}") private boolean enabled; @@ -123,13 +131,16 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener msg, TbCallback callback) { ToUsageStatsServiceMsg statsMsg = msg.getValue(); - TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); - if (tenantProfileCache.get(tenantId) == null) { - return; + TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); + EntityId entityId; + if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { + entityId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); + } else { + entityId = tenantId; } - TenantApiUsageState tenantState; + processEntityUsageStats(tenantId, entityId, statsMsg.getValuesList()); + callback.onSuccess(); + } + + private void processEntityUsageStats(TenantId tenantId, EntityId entityId, List values) { + if (deletedEntities.contains(entityId)) return; + + BaseApiUsageState usageState; List updatedEntries; Map result; + updateLock.lock(); try { - tenantState = getOrFetchState(tenantId); - long ts = tenantState.getCurrentCycleTs(); - long hourTs = tenantState.getCurrentHourTs(); + usageState = getOrFetchState(tenantId, entityId); + long ts = usageState.getCurrentCycleTs(); + long hourTs = usageState.getCurrentHourTs(); long newHourTs = SchedulerUtils.getStartOfCurrentHour(); if (newHourTs != hourTs) { - tenantState.setHour(newHourTs); + usageState.setHour(newHourTs); } updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); Set apiFeatures = new HashSet<>(); - for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) { + for (UsageStatsKVProto kvProto : values) { ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); - long newValue = tenantState.add(recordKey, kvProto.getValue()); + long newValue = usageState.add(recordKey, kvProto.getValue()); updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue))); - long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue()); + long newHourlyValue = usageState.addToHourly(recordKey, kvProto.getValue()); updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue))); apiFeatures.add(recordKey.getApiFeature()); } - result = tenantState.checkStateUpdatedDueToThreshold(apiFeatures); + if (usageState.getEntityType() == EntityType.TENANT && !usageState.getEntityId().equals(TenantId.SYS_TENANT_ID)) { + result = ((TenantApiUsageState) usageState).checkStateUpdatedDueToThreshold(apiFeatures); + } else { + result = Collections.emptyMap(); + } } finally { updateLock.unlock(); } - tsWsService.saveAndNotifyInternal(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); + tsWsService.saveAndNotifyInternal(tenantId, usageState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); if (!result.isEmpty()) { - persistAndNotify(tenantState, result); + persistAndNotify(usageState, result); } - callback.onSuccess(); } @Override protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) { - myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); - otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); + myUsageStates.entrySet().removeIf(entry -> { + return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); + }); + otherUsageStates.entrySet().removeIf(entry -> { + return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); + }); initStatesFromDataBase(); } } @Override public ApiUsageState getApiUsageState(TenantId tenantId) { - TenantApiUsageState tenantState = myTenantStates.get(tenantId); + TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId); if (tenantState != null) { return tenantState.getApiUsageState(); } else { - ApiUsageState state = otherTenantStates.get(tenantId); + ApiUsageState state = otherUsageStates.get(tenantId); if (state != null) { return state; } else { if (partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { - return getOrFetchState(tenantId).getApiUsageState(); + return getOrFetchState(tenantId, tenantId).getApiUsageState(); } else { updateLock.lock(); try { - state = otherTenantStates.get(tenantId); + state = otherUsageStates.get(tenantId); if (state == null) { state = apiUsageStateService.findTenantApiUsageState(tenantId); if (state != null) { - otherTenantStates.put(tenantId, state); + otherUsageStates.put(tenantId, state); } } } finally { @@ -231,7 +260,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { - if (tenantProfile.getId().equals(state.getTenantProfileId())) { - updateTenantState(state, tenantProfile); - } - }); + myUsageStates.values().stream() + .filter(state -> state.getEntityType() == EntityType.TENANT) + .map(state -> (TenantApiUsageState) state) + .forEach(state -> { + if (tenantProfile.getId().equals(state.getTenantProfileId())) { + updateTenantState(state, tenantProfile); + } + }); } finally { updateLock.unlock(); } @@ -256,7 +288,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener result) { - log.info("[{}] Detected update of the API state: {}", state.getTenantId(), result); + public void onTenantDelete(TenantId tenantId) { + deletedEntities.add(tenantId); + myUsageStates.remove(tenantId); + otherUsageStates.remove(tenantId); + } + + @Override + public void onCustomerDelete(CustomerId customerId) { + deletedEntities.add(customerId); + myUsageStates.remove(customerId); + } + + private void persistAndNotify(BaseApiUsageState state, Map result) { + log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result); apiUsageStateService.update(state.getApiUsageState()); clusterService.onApiStateChange(state.getApiUsageState(), null); long ts = System.currentTimeMillis(); List stateTelemetry = new ArrayList<>(); - result.forEach(((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name()))))); + result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name())))); tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK); - String email = tenantService.findTenantById(state.getTenantId()).getEmail(); - - if (StringUtils.isNotEmpty(email)) { - result.forEach((apiFeature, stateValue) -> { - mailExecutor.submit(() -> { - try { - mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage(state, apiFeature, stateValue)); - } catch (ThingsboardException e) { - log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e); - } + if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { + String email = tenantService.findTenantById(state.getTenantId()).getEmail(); + if (StringUtils.isNotEmpty(email)) { + result.forEach((apiFeature, stateValue) -> { + mailExecutor.submit(() -> { + try { + mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage((TenantApiUsageState) state, apiFeature, stateValue)); + } catch (ThingsboardException e) { + log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e); + } + }); }); - }); - } else { - log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId()); + } else { + log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId()); + } } } @@ -350,12 +395,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { + myUsageStates.values().forEach(state -> { if ((state.getNextCycleTs() < now) && (now - state.getNextCycleTs() < TimeUnit.HOURS.toMillis(1))) { - TenantId tenantId = state.getTenantId(); state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth()); saveNewCounts(state, Arrays.asList(ApiUsageRecordKey.values())); - updateTenantState(state, tenantProfileCache.get(tenantId)); + if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { + TenantId tenantId = state.getTenantId(); + updateTenantState((TenantApiUsageState) state, tenantProfileCache.get(tenantId)); + } } }); } finally { @@ -363,7 +410,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener keys) { + private void saveNewCounts(BaseApiUsageState state, List keys) { List counts = keys.stream() .map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L))) .collect(Collectors.toList()); @@ -371,52 +418,66 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener newCounts = new ArrayList<>(); + private BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) { + if (entityId == null || entityId.isNullUid()) { + entityId = tenantId; + } + BaseApiUsageState state = myUsageStates.get(entityId); + if (state != null) { + return state; + } + + ApiUsageState storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId); + if (storedState == null) { try { - List dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getId()).get(); - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - boolean cycleEntryFound = false; - boolean hourlyEntryFound = false; - for (TsKvEntry tsKvEntry : dbValues) { - if (tsKvEntry.getKey().equals(key.getApiCountKey())) { - cycleEntryFound = true; - - boolean oldCount = tsKvEntry.getTs() == tenantState.getCurrentCycleTs(); - tenantState.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); - - if (!oldCount) { - newCounts.add(key); - } - } else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { - hourlyEntryFound = true; - tenantState.putHourly(key, tsKvEntry.getTs() == tenantState.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); - } - if (cycleEntryFound && hourlyEntryFound) { - break; + storedState = apiUsageStateService.createDefaultApiUsageState(tenantId, entityId); + } catch (Exception e) { + storedState = apiUsageStateService.findApiUsageStateByEntityId(entityId); + } + } + if (entityId.getEntityType() == EntityType.TENANT) { + if (!entityId.equals(TenantId.SYS_TENANT_ID)) { + state = new TenantApiUsageState(tenantProfileCache.get((TenantId) entityId), storedState); + } else { + state = new TenantApiUsageState(storedState); + } + } else { + state = new CustomerApiUsageState(storedState); + } + + List newCounts = new ArrayList<>(); + try { + List dbValues = tsService.findAllLatest(tenantId, storedState.getId()).get(); + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + boolean cycleEntryFound = false; + boolean hourlyEntryFound = false; + for (TsKvEntry tsKvEntry : dbValues) { + if (tsKvEntry.getKey().equals(key.getApiCountKey())) { + cycleEntryFound = true; + + boolean oldCount = tsKvEntry.getTs() == state.getCurrentCycleTs(); + state.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); + + if (!oldCount) { + newCounts.add(key); } + } else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { + hourlyEntryFound = true; + state.putHourly(key, tsKvEntry.getTs() == state.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); + } + if (cycleEntryFound && hourlyEntryFound) { + break; } } - log.debug("[{}] Initialized state: {}", tenantId, dbStateEntity); - myTenantStates.put(tenantId, tenantState); - saveNewCounts(tenantState, newCounts); - } catch (InterruptedException | ExecutionException e) { - log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); } + log.debug("[{}] Initialized state: {}", entityId, storedState); + myUsageStates.put(entityId, state); + saveNewCounts(state, newCounts); + } catch (InterruptedException | ExecutionException e) { + log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); } - return tenantState; + + return state; } private void initStatesFromDataBase() { @@ -429,11 +490,11 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); List> futures = new ArrayList<>(); for (Tenant tenant : tenantIterator) { - if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { + if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { log.debug("[{}] Initializing tenant state.", tenant.getId()); futures.add(tmpInitExecutor.submit(() -> { try { - updateTenantState(getOrFetchState(tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); + updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); log.debug("[{}] Initialized tenant state.", tenant.getId()); } catch (Exception e) { log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index 117607ae9f..27755b9b39 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.apiusage; import org.springframework.context.ApplicationListener; import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -34,5 +35,9 @@ public interface TbApiUsageStateService extends ApplicationListener currentCycleValues = new ConcurrentHashMap<>(); - private final Map currentHourValues = new ConcurrentHashMap<>(); +public class TenantApiUsageState extends BaseApiUsageState { @Getter @Setter private TenantProfileId tenantProfileId; @Getter @Setter private TenantProfileData tenantProfileData; - @Getter - private final ApiUsageState apiUsageState; - @Getter - private volatile long currentCycleTs; - @Getter - private volatile long nextCycleTs; - @Getter - private volatile long currentHourTs; public TenantApiUsageState(TenantProfile tenantProfile, ApiUsageState apiUsageState) { + super(apiUsageState); this.tenantProfileId = tenantProfile.getId(); this.tenantProfileData = tenantProfile.getProfileData(); - this.apiUsageState = apiUsageState; - this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); - this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); - this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); - } - - public void put(ApiUsageRecordKey key, Long value) { - currentCycleValues.put(key, value); } - public void putHourly(ApiUsageRecordKey key, Long value) { - currentHourValues.put(key, value); - } - - public long add(ApiUsageRecordKey key, long value) { - long result = currentCycleValues.getOrDefault(key, 0L) + value; - currentCycleValues.put(key, result); - return result; - } - - public long get(ApiUsageRecordKey key) { - return currentCycleValues.getOrDefault(key, 0L); - } - - public long addToHourly(ApiUsageRecordKey key, long value) { - long result = currentHourValues.getOrDefault(key, 0L) + value; - currentHourValues.put(key, result); - return result; - } - - public void setHour(long currentHourTs) { - this.currentHourTs = currentHourTs; - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - currentHourValues.put(key, 0L); - } - } - - public void setCycles(long currentCycleTs, long nextCycleTs) { - this.currentCycleTs = currentCycleTs; - this.nextCycleTs = nextCycleTs; - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - currentCycleValues.put(key, 0L); - } + public TenantApiUsageState(ApiUsageState apiUsageState) { + super(apiUsageState); } public long getProfileThreshold(ApiUsageRecordKey key) { @@ -111,53 +59,25 @@ public class TenantApiUsageState { return tenantProfileData.getConfiguration().getWarnThreshold(key); } - public TenantId getTenantId() { - return apiUsageState.getTenantId(); - } - - public ApiUsageStateValue getFeatureValue(ApiFeature feature) { - switch (feature) { - case TRANSPORT: - return apiUsageState.getTransportState(); - case RE: - return apiUsageState.getReExecState(); - case DB: - return apiUsageState.getDbStorageState(); - case JS: - return apiUsageState.getJsExecState(); - case EMAIL: - return apiUsageState.getEmailExecState(); - case SMS: - return apiUsageState.getSmsExecState(); - default: - return ApiUsageStateValue.ENABLED; + private Pair checkStateUpdatedDueToThreshold(ApiFeature feature) { + ApiUsageStateValue featureValue = ApiUsageStateValue.ENABLED; + for (ApiUsageRecordKey recordKey : ApiUsageRecordKey.getKeys(feature)) { + long value = get(recordKey); + long threshold = getProfileThreshold(recordKey); + long warnThreshold = getProfileWarnThreshold(recordKey); + ApiUsageStateValue tmpValue; + if (threshold == 0 || value == 0 || value < warnThreshold) { + tmpValue = ApiUsageStateValue.ENABLED; + } else if (value < threshold) { + tmpValue = ApiUsageStateValue.WARNING; + } else { + tmpValue = ApiUsageStateValue.DISABLED; + } + featureValue = ApiUsageStateValue.toMoreRestricted(featureValue, tmpValue); } + return setFeatureValue(feature, featureValue) ? Pair.of(feature, featureValue) : null; } - public boolean setFeatureValue(ApiFeature feature, ApiUsageStateValue value) { - ApiUsageStateValue currentValue = getFeatureValue(feature); - switch (feature) { - case TRANSPORT: - apiUsageState.setTransportState(value); - break; - case RE: - apiUsageState.setReExecState(value); - break; - case DB: - apiUsageState.setDbStorageState(value); - break; - case JS: - apiUsageState.setJsExecState(value); - break; - case EMAIL: - apiUsageState.setEmailExecState(value); - break; - case SMS: - apiUsageState.setSmsExecState(value); - break; - } - return !currentValue.equals(value); - } public Map checkStateUpdatedDueToThresholds() { return checkStateUpdatedDueToThreshold(new HashSet<>(Arrays.asList(ApiFeature.values()))); @@ -174,23 +94,9 @@ public class TenantApiUsageState { return result; } - public Pair checkStateUpdatedDueToThreshold(ApiFeature feature) { - ApiUsageStateValue featureValue = ApiUsageStateValue.ENABLED; - for (ApiUsageRecordKey recordKey : ApiUsageRecordKey.getKeys(feature)) { - long value = get(recordKey); - long threshold = getProfileThreshold(recordKey); - long warnThreshold = getProfileWarnThreshold(recordKey); - ApiUsageStateValue tmpValue; - if (threshold == 0 || value == 0 || value < warnThreshold) { - tmpValue = ApiUsageStateValue.ENABLED; - } else if (value < threshold) { - tmpValue = ApiUsageStateValue.WARNING; - } else { - tmpValue = ApiUsageStateValue.DISABLED; - } - featureValue = ApiUsageStateValue.toMoreRestricted(featureValue, tmpValue); - } - return setFeatureValue(feature, featureValue) ? Pair.of(feature, featureValue) : null; + @Override + public EntityType getEntityType() { + return EntityType.TENANT; } } diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java index 3fea206c3b..4ba4080a74 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java @@ -230,7 +230,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { private void pushProvisionEventToRuleEngine(ProvisionRequest request, Device device, String type) { try { JsonNode entityNode = JacksonUtil.valueToTree(request); - TbMsg msg = TbMsg.newMsg(type, device.getId(), createTbMsgMetaData(device), JacksonUtil.toString(entityNode)); + TbMsg msg = TbMsg.newMsg(type, device.getId(), device.getCustomerId(), createTbMsgMetaData(device), JacksonUtil.toString(entityNode)); sendToRuleEngine(device.getTenantId(), msg, null); } catch (IllegalArgumentException e) { log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), type, e); @@ -240,7 +240,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { private void pushDeviceCreatedEventToRuleEngine(Device device) { try { ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); - TbMsg msg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), createTbMsgMetaData(device), JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); + TbMsg msg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), device.getCustomerId(), createTbMsgMetaData(device), JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); sendToRuleEngine(device.getTenantId(), msg, null); } catch (JsonProcessingException | IllegalArgumentException e) { log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 70978df0a3..fb01926e2a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -920,7 +920,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), edge.getCustomerId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java index 35eea3f962..216e5560d5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java @@ -231,7 +231,7 @@ public class DeviceProcessor extends BaseProcessor { try { DeviceId deviceId = device.getId(); ObjectNode entityNode = mapper.valueToTree(device); - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(), getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { @Override diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index c858955bfc..53ef04fe85 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -70,7 +70,7 @@ public class TelemetryProcessor extends BaseProcessor { private final Gson gson = new Gson(); - public List> onTelemetryUpdate(TenantId tenantId, EntityDataProto entityData) { + public List> onTelemetryUpdate(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); @@ -80,14 +80,14 @@ public class TelemetryProcessor extends BaseProcessor { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData)); + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); } if (entityData.hasPostTelemetryMsg()) { - result.add(processPostTelemetry(tenantId, entityId, entityData.getPostTelemetryMsg(), metaData)); + result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } } if (entityData.hasAttributeDeleteMsg()) { @@ -148,7 +148,7 @@ public class TelemetryProcessor extends BaseProcessor { } } - private ListenableFuture processPostTelemetry(TenantId tenantId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); @@ -156,7 +156,7 @@ public class TelemetryProcessor extends BaseProcessor { Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); - TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null); + TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { @@ -173,13 +173,13 @@ public class TelemetryProcessor extends BaseProcessor { return futureToSet; } - private ListenableFuture processPostAttributes(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); - TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null); + TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { @@ -195,7 +195,7 @@ public class TelemetryProcessor extends BaseProcessor { return futureToSet; } - private ListenableFuture processAttributesUpdate(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Set attributes = JsonConverter.convertToAttributes(json); @@ -206,7 +206,7 @@ public class TelemetryProcessor extends BaseProcessor { Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); - TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json), ruleChainId, null); + TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), ruleChainId, null); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index 9a989e38f8..470f39f401 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -390,7 +390,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService pageData = tenantService.findTenants(pageLink); for (Tenant tenant : pageData.getData()) { try { - apiUsageStateService.createDefaultApiUsageState(tenant.getId()); + apiUsageStateService.createDefaultApiUsageState(tenant.getId(), null); } catch (Exception e) { } List deviceTypes = deviceService.findDeviceTypesByTenantId(tenant.getId()).get(); diff --git a/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java b/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java index 972c99f866..b4c1058616 100644 --- a/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java +++ b/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.ApiUsageStateMailMessage; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -233,7 +234,7 @@ public class DefaultMailService implements MailService { } @Override - public void send(TenantId tenantId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException { + public void send(TenantId tenantId, CustomerId customerId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException { if (apiUsageStateService.getApiUsageState(tenantId).isEmailSendEnabled()) { MimeMessage mailMsg = mailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(mailMsg, "UTF-8"); @@ -248,7 +249,7 @@ public class DefaultMailService implements MailService { helper.setSubject(subject); helper.setText(body); mailSender.send(helper.getMimeMessage()); - apiUsageClient.report(tenantId, ApiUsageRecordKey.EMAIL_EXEC_COUNT, 1); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.EMAIL_EXEC_COUNT, 1); } else { throw new RuntimeException("Email sending is disabled due to API limits!"); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 2f5610d7ff..fff648401b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 02378eb557..206e330658 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -18,14 +18,15 @@ package org.thingsboard.server.service.queue.processing; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.ApplicationListener; import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbActorMsg; @@ -72,7 +73,8 @@ public abstract class AbstractConsumerService> nfConsumer; public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, - TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, TbApiUsageStateService apiUsageStateService, TbQueueConsumer> nfConsumer) { + TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, + TbApiUsageStateService apiUsageStateService, TbQueueConsumer> nfConsumer) { this.actorContext = actorContext; this.encodingService = encodingService; this.tenantProfileCache = tenantProfileCache; @@ -166,6 +168,8 @@ public abstract class AbstractConsumerService responseConsumer) { + public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer, SecurityUser currentUser) { log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToRuleEngineRpcRequests.put(requestId, responseConsumer); - sendRpcRequestToRuleEngine(request); + sendRpcRequestToRuleEngine(request, currentUser); scheduleToRuleEngineTimeout(request, requestId); } @@ -149,7 +150,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } } - private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg) { + private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg, SecurityUser currentUser) { ObjectNode entityNode = json.createObjectNode(); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("requestUUID", msg.getId().toString()); @@ -167,7 +168,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { entityNode.put("params", msg.getBody().getParams()); try { - TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); + TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), currentUser.getCustomerId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java index 74e7072611..c3c6f9b888 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.rpc; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.service.security.model.SecurityUser; import java.util.function.Consumer; @@ -27,11 +28,11 @@ public interface TbCoreDeviceRpcService { /** * Handles REST API calls that contain RPC requests to Device and pushes them to Rule Engine. * Schedules the timeout for the RPC call based on the {@link ToDeviceRpcRequest} - * - * @param request the RPC request + * @param request the RPC request * @param responseConsumer the consumer of the RPC response + * @param currentUser */ - void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer); + void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer, SecurityUser currentUser); /** * Handles the RPC response from the Rule Engine. diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java index 7342169a86..2f0378ae9d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; @@ -73,14 +74,14 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { } @Override - public ListenableFuture invokeFunction(TenantId tenantId, UUID scriptId, Object... args) { + public ListenableFuture invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) { if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) { String functionName = scriptIdToNameMap.get(scriptId); if (functionName == null) { return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!")); } if (!isDisabled(scriptId)) { - apiUsageClient.report(tenantId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); return doInvokeFunction(scriptId, functionName, args); } else { return Futures.immediateFailedFuture( diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java index 1636b2cbf0..317170f1a7 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java @@ -16,7 +16,7 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import java.util.UUID; @@ -25,7 +25,7 @@ public interface JsInvokeService { ListenableFuture eval(TenantId tenantId, JsScriptType scriptType, String scriptBody, String... argNames); - ListenableFuture invokeFunction(TenantId tenantId, UUID scriptId, Object... args); + ListenableFuture invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args); ListenableFuture release(UUID scriptId); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 5dffc0217c..066ce71a58 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -218,7 +218,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private JsonNode executeScript(TbMsg msg) throws ScriptException { try { String[] inArgs = prepareArgs(msg); - String eval = sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); + String eval = sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); return mapper.readTree(eval); } catch (ExecutionException e) { if (e.getCause() instanceof ScriptException) { @@ -235,7 +235,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private ListenableFuture executeScriptAsync(TbMsg msg) { String[] inArgs = prepareArgs(msg); - return Futures.transformAsync(sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]), + return Futures.transformAsync(sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]), o -> { try { return Futures.immediateFuture(mapper.readTree(o.toString())); diff --git a/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java b/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java index f7a012b23e..f0b3eb174e 100644 --- a/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java +++ b/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.sms.SmsSender; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.sms.config.SmsProviderConfiguration; import org.thingsboard.server.common.data.sms.config.TestSmsRequest; import org.thingsboard.server.common.data.AdminSettings; @@ -94,7 +95,7 @@ public class DefaultSmsService implements SmsService { } @Override - public void sendSms(TenantId tenantId, String[] numbersTo, String message) throws ThingsboardException { + public void sendSms(TenantId tenantId, CustomerId customerId, String[] numbersTo, String message) throws ThingsboardException { if (apiUsageStateService.getApiUsageState(tenantId).isSmsSendEnabled()) { int smsCount = 0; try { @@ -103,7 +104,7 @@ public class DefaultSmsService implements SmsService { } } finally { if (smsCount > 0) { - apiUsageClient.report(tenantId, ApiUsageRecordKey.SMS_EXEC_COUNT, smsCount); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.SMS_EXEC_COUNT, smsCount); } } } else { diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index c8a4e80e0e..599cdbb730 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -468,6 +468,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener ts, FutureCallback callback) { - saveAndNotify(tenantId, entityId, ts, 0L, callback); + saveAndNotify(tenantId, null, entityId, ts, 0L, callback); } @Override - public void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + public void saveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List ts, long ttl, FutureCallback callback) { checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { @@ -127,7 +125,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void onSuccess(Integer result) { if (!sysTenant && result != null && result > 0) { - apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); } callback.onSuccess(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 6da8db7a93..f9d63c0edd 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -86,6 +86,7 @@ import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.resource.TbResourceService; import org.thingsboard.server.service.state.DeviceStateService; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -284,7 +285,7 @@ public class DefaultTransportApiService implements TransportApiService { DeviceId deviceId = device.getId(); ObjectNode entityNode = mapper.valueToTree(device); - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); + TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); } GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() @@ -423,6 +424,8 @@ public class DefaultTransportApiService implements TransportApiService { return DeviceInfoProto.newBuilder() .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(Optional.ofNullable(device.getCustomerId()).map(customerId -> customerId.getId().getMostSignificantBits()).orElse(0L)) + .setCustomerIdLSB(Optional.ofNullable(device.getCustomerId()).map(customerId -> customerId.getId().getLeastSignificantBits()).orElse(0L)) .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) .setDeviceName(device.getName()) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0fd38db24e..f335fb190d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -124,6 +124,7 @@ usage: stats: report: enabled: "${USAGE_STATS_REPORT_ENABLED:true}" + enabled_per_customer: "${USAGE_STATS_REPORT_PER_CUSTOMER_ENABLED:false}" interval: "${USAGE_STATS_REPORT_INTERVAL:10}" check: cycle: "${USAGE_STATS_CHECK_CYCLE:60000}" diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 2249940c48..2532fbc00d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -21,12 +21,12 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -750,7 +750,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { private void sendTelemetry(Device device, List tsData) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback() { + tsService.saveAndNotify(device.getTenantId(), null, device.getId(), tsData, 0, new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { latch.countDown(); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java index 5b0985e66c..6dc4af98f4 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java @@ -17,17 +17,22 @@ package org.thingsboard.server.dao.usagerecord; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; public interface ApiUsageStateService { - ApiUsageState createDefaultApiUsageState(TenantId id); + ApiUsageState createDefaultApiUsageState(TenantId id, EntityId entityId); ApiUsageState update(ApiUsageState apiUsageState); ApiUsageState findTenantApiUsageState(TenantId tenantId); + ApiUsageState findApiUsageStateByEntityId(EntityId entityId); + void deleteApiUsageStateByTenantId(TenantId tenantId); + void deleteApiUsageStateByEntityId(EntityId entityId); + ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/ApiUsageStateFilter.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/ApiUsageStateFilter.java index f1008feb4f..f2f90472c0 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/ApiUsageStateFilter.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/ApiUsageStateFilter.java @@ -16,10 +16,13 @@ package org.thingsboard.server.common.data.query; import lombok.Data; -import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.CustomerId; @Data public class ApiUsageStateFilter implements EntityFilter { + + private CustomerId customerId; + @Override public EntityFilterType getType() { return EntityFilterType.API_USAGE_STATE; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 66a3670ddd..1207f4d634 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -19,10 +19,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import lombok.AccessLevel; -import lombok.Builder; import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.RuleChainId; @@ -47,6 +48,7 @@ public final class TbMsg implements Serializable { private final long ts; private final String type; private final EntityId originator; + private final CustomerId customerId; private final TbMsgMetaData metaData; private final TbMsgDataType dataType; private final String data; @@ -55,6 +57,7 @@ public final class TbMsg implements Serializable { @Getter(value = AccessLevel.NONE) private final AtomicInteger ruleNodeExecCounter; + public int getAndIncrementRuleNodeCounter() { return ruleNodeExecCounter.getAndIncrement(); } @@ -64,60 +67,81 @@ public final class TbMsg implements Serializable { transient private final TbMsgCallback callback; public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, + return newMsg(queueName, type, originator, null, metaData, data, ruleChainId, ruleNodeId); + } + + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); + return newMsg(type, originator, null, metaData, data); + } + + public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); } // REALLY NEW MSG public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); + return newMsg(queueName, type, originator, null, metaData, data); + } + + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); + } + + public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY); + return newMsg(type, originator, null, metaData, dataType, data); } // For Tests only public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, callback); + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, callback); } public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(tbMsg.getQueueName(), tbMsg.getId(), tbMsg.getTs(), type, originator, metaData.copy(), tbMsg.getDataType(), - data, tbMsg.getRuleChainId(), tbMsg.getRuleNodeId(), tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, + data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ruleNodeExecCounter.get(), tbMsg.callback); + } + + public static TbMsg transformMsg(TbMsg tbMsg, CustomerId customerId) { + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType, + tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); } public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) { - return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); } public static TbMsg transformMsg(TbMsg tbMsg, String queueName) { - return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); } public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) { - return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback()); } public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), + return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ruleNodeExecCounter.get(), TbMsgCallback.EMPTY); } - private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, + private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) { this.id = id; this.queueName = queueName != null ? queueName : ServiceQueue.MAIN; @@ -128,6 +152,15 @@ public final class TbMsg implements Serializable { } this.type = type; this.originator = originator; + if (customerId == null || customerId.isNullUid()) { + if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) { + this.customerId = (CustomerId) originator; + } else { + this.customerId = null; + } + } else { + this.customerId = customerId; + } this.metaData = metaData; this.dataType = dataType; this.data = data; @@ -154,6 +187,11 @@ public final class TbMsg implements Serializable { builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits()); builder.setEntityIdLSB(msg.getOriginator().getId().getLeastSignificantBits()); + if (msg.getCustomerId() != null) { + builder.setCustomerIdMSB(msg.getCustomerId().getId().getMostSignificantBits()); + builder.setCustomerIdLSB(msg.getCustomerId().getId().getLeastSignificantBits()); + } + if (msg.getRuleChainId() != null) { builder.setRuleChainIdMSB(msg.getRuleChainId().getId().getMostSignificantBits()); builder.setRuleChainIdLSB(msg.getRuleChainId().getId().getLeastSignificantBits()); @@ -179,16 +217,21 @@ public final class TbMsg implements Serializable { MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + CustomerId customerId = null; RuleChainId ruleChainId = null; RuleNodeId ruleNodeId = null; + if (proto.getCustomerIdMSB() != 0L && proto.getCustomerIdLSB() != 0L) { + customerId = new CustomerId(new UUID(proto.getCustomerIdMSB(), proto.getCustomerIdLSB())); + } if (proto.getRuleChainIdMSB() != 0L && proto.getRuleChainIdLSB() != 0L) { ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); } if (proto.getRuleNodeIdMSB() != 0L && proto.getRuleNodeIdLSB() != 0L) { ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); } + TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getRuleNodeExecCounter(), callback); + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getRuleNodeExecCounter(), callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } @@ -199,11 +242,11 @@ public final class TbMsg implements Serializable { } public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) { - return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, this.ruleNodeExecCounter.get(), callback); + return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, null, this.ruleNodeExecCounter.get(), callback); } public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) { - return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ruleNodeExecCounter.get(), callback); + return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ruleNodeExecCounter.get(), callback); } public TbMsgCallback getCallback() { diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto index 9f2142844d..75f5051fbf 100644 --- a/common/message/src/main/proto/tbmsg.proto +++ b/common/message/src/main/proto/tbmsg.proto @@ -46,4 +46,7 @@ message TbMsgProto { int64 ts = 15; int32 ruleNodeExecCounter = 16; + + int64 customerIdMSB = 17; + int64 customerIdLSB = 18; } \ No newline at end of file diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java index 01a322f158..370de259ef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java @@ -15,10 +15,14 @@ */ package org.thingsboard.server.queue.usagestats; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -31,6 +35,8 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import javax.annotation.PostConstruct; +import java.util.EnumMap; +import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -44,11 +50,13 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @Value("${usage.stats.report.enabled:true}") private boolean enabled; + @Value("${usage.stats.report.enabled_per_customer:false}") + private boolean enabledPerCustomer; @Value("${usage.stats.report.interval:10}") private int interval; - @SuppressWarnings("unchecked") - private final ConcurrentMap[] values = new ConcurrentMap[ApiUsageRecordKey.values().length]; + private final EnumMap> stats = new EnumMap<>(ApiUsageRecordKey.class); + private final PartitionService partitionService; private final SchedulerComponent scheduler; private final TbQueueProducerProvider producerProvider; @@ -65,55 +73,93 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { if (enabled) { msgProducer = this.producerProvider.getTbUsageStatsMsgProducer(); for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - values[key.ordinal()] = new ConcurrentHashMap<>(); + stats.put(key, new ConcurrentHashMap<>()); } - scheduler.scheduleWithFixedDelay(this::reportStats, new Random().nextInt(interval), interval, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(() -> { + try { + reportStats(); + } catch (Exception e) { + log.warn("Failed to report statistics: ", e); + } + }, new Random().nextInt(interval), interval, TimeUnit.SECONDS); } } private void reportStats() { - try { - ConcurrentMap report = new ConcurrentHashMap<>(); + ConcurrentMap report = new ConcurrentHashMap<>(); - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - values[key.ordinal()].forEach(((tenantId, atomicLong) -> { - long value = atomicLong.getAndSet(0); - if (value > 0) { - ToUsageStatsServiceMsg.Builder msgBuilder = report.computeIfAbsent(tenantId, id -> { - ToUsageStatsServiceMsg.Builder msg = ToUsageStatsServiceMsg.newBuilder(); - msg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); - msg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); - return msg; - }); - msgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build()); + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + ConcurrentMap statsForKey = stats.get(key); + statsForKey.forEach((ownerId, statsValue) -> { + long value = statsValue.get(); + if (value == 0) return; + + ToUsageStatsServiceMsg.Builder statsMsgBuilder = report.computeIfAbsent(ownerId, id -> { + ToUsageStatsServiceMsg.Builder newStatsMsgBuilder = ToUsageStatsServiceMsg.newBuilder(); + + TenantId tenantId = ownerId.getTenantId(); + newStatsMsgBuilder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); + newStatsMsgBuilder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); + + EntityId entityId = ownerId.getEntityId(); + if (entityId != null && entityId.getEntityType() == EntityType.CUSTOMER) { + newStatsMsgBuilder.setCustomerIdMSB(entityId.getId().getMostSignificantBits()); + newStatsMsgBuilder.setCustomerIdLSB(entityId.getId().getLeastSignificantBits()); } - })); - } - report.forEach(((tenantId, builder) -> { - //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).newByTopic(msgProducer.getDefaultTopic()); - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null); - })); - if (!report.isEmpty()) { - log.info("Report statistics for: {} tenants", report.size()); - } - } catch (Exception e) { - log.warn("Failed to report statistics: ", e); + return newStatsMsgBuilder; + }); + + statsMsgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build()); + }); + statsForKey.clear(); + } + + report.forEach(((ownerId, statsMsg) -> { + //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? + + TenantId tenantId = ownerId.getTenantId(); + EntityId entityId = Optional.ofNullable(ownerId.getEntityId()).orElse(tenantId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId).newByTopic(msgProducer.getDefaultTopic()); + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), statsMsg.build()), null); + })); + + if (!report.isEmpty()) { + log.info("Reporting API usage statistics for {} tenants and customers", report.size()); } } @Override - public void report(TenantId tenantId, ApiUsageRecordKey key, long value) { + public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) { if (enabled) { - ConcurrentMap map = values[key.ordinal()]; - AtomicLong atomicValue = map.computeIfAbsent(tenantId, id -> new AtomicLong()); - atomicValue.addAndGet(value); + ConcurrentMap statsForKey = stats.get(key); + + statsForKey.computeIfAbsent(new OwnerId(tenantId), id -> new AtomicLong()).addAndGet(value); + statsForKey.computeIfAbsent(new OwnerId(TenantId.SYS_TENANT_ID), id -> new AtomicLong()).addAndGet(value); + + if (enabledPerCustomer && customerId != null && !customerId.isNullUid()) { + statsForKey.computeIfAbsent(new OwnerId(tenantId, customerId), id -> new AtomicLong()).addAndGet(value); + } } } @Override - public void report(TenantId tenantId, ApiUsageRecordKey key) { - report(tenantId, key, 1L); + public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key) { + report(tenantId, customerId, key, 1); + } + + @Data + private static class OwnerId { + private TenantId tenantId; + private EntityId entityId; + + public OwnerId(TenantId tenantId) { + this.tenantId = tenantId; + } + + public OwnerId(TenantId tenantId, EntityId entityId) { + this.tenantId = tenantId; + this.entityId = entityId; + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java index f46a2d0143..be1b071873 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java @@ -16,12 +16,13 @@ package org.thingsboard.server.queue.usagestats; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; public interface TbApiUsageClient { - void report(TenantId tenantId, ApiUsageRecordKey key, long value); + void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value); - void report(TenantId tenantId, ApiUsageRecordKey key); + void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key); } diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 6335670f07..1594ddbbac 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -53,6 +53,8 @@ message SessionInfoProto { int64 gwSessionIdLSB = 11; int64 deviceProfileIdMSB = 12; int64 deviceProfileIdLSB = 13; + int64 customerIdMSB = 14; + int64 customerIdLSB = 15; } enum SessionEvent { @@ -110,6 +112,8 @@ message DeviceInfoProto { string additionalInfo = 7; int64 deviceProfileIdMSB = 8; int64 deviceProfileIdLSB = 9; + int64 customerIdMSB = 10; + int64 customerIdLSB = 11; } /** @@ -656,4 +660,6 @@ message ToUsageStatsServiceMsg { int64 entityIdMSB = 3; int64 entityIdLSB = 4; repeated UsageStatsKVProto values = 5; + int64 customerIdMSB = 6; + int64 customerIdLSB = 7; } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java index ed0a1d16e6..d3ccf0e86d 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java @@ -141,6 +141,8 @@ public class LwM2mTransportContextServer extends TransportContext { .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerIdMSB()) + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerIdLSB()) .setDeviceName(msg.getDeviceInfo().getDeviceName()) .setDeviceType(msg.getDeviceInfo().getDeviceType()) .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java index 05f386ad1a..4a432ca131 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java @@ -1177,6 +1177,8 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerIdMSB()) + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerIdLSB()) .setDeviceName(msg.getDeviceInfo().getDeviceName()) .setDeviceType(msg.getDeviceInfo().getDeviceType()) .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index a7d5499cfb..e8d9a6b6d4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -45,6 +45,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits()) .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits()) .setDeviceName(deviceInfo.getDeviceName()) .setDeviceType(deviceInfo.getDeviceType()) .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits()) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java index b175ca8580..63d19b9c30 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java @@ -40,6 +40,8 @@ public class SessionInfoCreator { .setDeviceIdLSB(msg.getDeviceInfo().getDeviceId().getId().getLeastSignificantBits()) .setTenantIdMSB(msg.getDeviceInfo().getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(msg.getDeviceInfo().getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerId().getId().getLeastSignificantBits()) .setDeviceName(msg.getDeviceInfo().getDeviceName()) .setDeviceType(msg.getDeviceInfo().getDeviceType()) .setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileId().getId().getMostSignificantBits()) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java index 20d01420c4..27d42d867f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.transport.auth; import lombok.Data; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; @@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; public class TransportDeviceInfo { private TenantId tenantId; + private CustomerId customerId; private DeviceProfileId deviceProfileId; private DeviceId deviceId; private String deviceName; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index fa28627882..1efe4edcea 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; @@ -359,6 +361,7 @@ public class DefaultTransportService implements TransportService { private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) { TransportDeviceInfo tdi = new TransportDeviceInfo(); tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB()))); + tdi.setCustomerId(new CustomerId(new UUID(di.getCustomerIdMSB(), di.getCustomerIdLSB()))); tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB()))); tdi.setDeviceProfileId(new DeviceProfileId(new UUID(di.getDeviceProfileIdMSB(), di.getDeviceProfileIdLSB()))); tdi.setAdditionalInfo(di.getAdditionalInfo()); @@ -403,17 +406,17 @@ public class DefaultTransportService implements TransportService { } if (checkLimits(sessionInfo, msg, callback, dataPoints)) { reportActivityInternal(sessionInfo); - TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + TenantId tenantId = getTenantId(sessionInfo); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); - MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, dataPoints, callback)); + CustomerId customerId = getCustomerId(sessionInfo); + MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, customerId, dataPoints, callback)); for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("ts", tsKv.getTs() + ""); JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback); - + sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback); } } } @@ -422,15 +425,16 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) { reportActivityInternal(sessionInfo); - TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + TenantId tenantId = getTenantId(sessionInfo); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("notifyDevice", "false"); - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, - new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, msg.getKvList().size(), callback))); + CustomerId customerId = getCustomerId(sessionInfo); + sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, + new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback))); } } @@ -439,7 +443,7 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -448,8 +452,8 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -458,8 +462,8 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToRPC(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -467,8 +471,8 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToDeviceRPCCallResponse(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -511,7 +515,7 @@ public class DefaultTransportService implements TransportService { metaData.putValue("requestId", Integer.toString(msg.getRequestId())); metaData.putValue("serviceId", serviceInfoProvider.getServiceId()); metaData.putValue("sessionId", sessionId.toString()); - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, + sendToRuleEngine(tenantId, deviceId, getCustomerId(sessionInfo), sessionInfo, json, metaData, SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback)); String requestId = sessionId + "-" + msg.getRequestId(); toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId())); @@ -809,6 +813,16 @@ public class DefaultTransportService implements TransportService { return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); } + protected CustomerId getCustomerId(TransportProtos.SessionInfoProto sessionInfo) { + long msb = sessionInfo.getCustomerIdMSB(); + long lsb = sessionInfo.getCustomerIdLSB(); + if (msb != 0 && lsb != 0) { + return new CustomerId(new UUID(msb, lsb)); + } else { + return new CustomerId(EntityId.NULL_UUID); + } + } + protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) { return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); } @@ -847,7 +861,7 @@ public class DefaultTransportService implements TransportService { ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); } - private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, + private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) { DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); @@ -864,7 +878,7 @@ public class DefaultTransportService implements TransportService { queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN; } - TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, metaData, gson.toJson(json), ruleChainId, null); + TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null); sendToRuleEngine(tenantId, tbMsg, callback); } @@ -934,11 +948,13 @@ public class DefaultTransportService implements TransportService { private class ApiStatsProxyCallback implements TransportServiceCallback { private final TenantId tenantId; + private final CustomerId customerId; private final int dataPoints; private final TransportServiceCallback callback; - public ApiStatsProxyCallback(TenantId tenantId, int dataPoints, TransportServiceCallback callback) { + public ApiStatsProxyCallback(TenantId tenantId, CustomerId customerId, int dataPoints, TransportServiceCallback callback) { this.tenantId = tenantId; + this.customerId = customerId; this.dataPoints = dataPoints; this.callback = callback; } @@ -946,8 +962,8 @@ public class DefaultTransportService implements TransportService { @Override public void onSuccess(T msg) { try { - apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1); - apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints); } finally { callback.onSuccess(msg); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java index 8f13900a6a..89690e5a3b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java @@ -43,6 +43,7 @@ import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantDao; +import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.dao.user.UserService; import java.io.IOException; @@ -79,6 +80,9 @@ public class CustomerServiceImpl extends AbstractEntityService implements Custom @Autowired private DashboardService dashboardService; + @Autowired + private ApiUsageStateService apiUsageStateService; + @Autowired @Lazy private TbTenantProfileCache tenantProfileCache; @@ -128,6 +132,7 @@ public class CustomerServiceImpl extends AbstractEntityService implements Custom edgeService.unassignCustomerEdges(customer.getTenantId(), customerId); userService.deleteCustomerUsers(customer.getTenantId(), customerId); deleteEntityRelations(tenantId, customerId); + apiUsageStateService.deleteApiUsageStateByEntityId(customerId); customerDao.removeById(tenantId, customerId.getId()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index ac4d2b9553..9c7e8de01d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.ApiUsageStateFilter; import org.thingsboard.server.common.data.query.AssetSearchQueryFilter; import org.thingsboard.server.common.data.query.AssetTypeFilter; import org.thingsboard.server.common.data.query.DeviceSearchQueryFilter; @@ -219,8 +220,9 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { " THEN (select additional_info from edge where id = entity_id)" + " END as additional_info"; - private static final String SELECT_API_USAGE_STATE = "(select aus.id, aus.created_time, aus.tenant_id, '13814000-1dd2-11b2-8080-808080808080'::uuid as customer_id, " + - "(select title from tenant where id = aus.tenant_id) as name from api_usage_state as aus)"; + private static final String SELECT_API_USAGE_STATE = "(select aus.id, aus.created_time, aus.tenant_id, aus.entity_id, " + + "coalesce((select title from tenant where id = aus.entity_id), (select title from customer where id = aus.entity_id)) as name " + + "from api_usage_state as aus)"; static { entityTableMap.put(EntityType.ASSET, "asset"); @@ -466,6 +468,22 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { case ENTITY_VIEW_SEARCH_QUERY: case EDGE_SEARCH_QUERY: return this.defaultPermissionQuery(ctx); + case API_USAGE_STATE: + CustomerId filterCustomerId = ((ApiUsageStateFilter) entityFilter).getCustomerId(); + if (ctx.getCustomerId() != null && !ctx.getCustomerId().isNullUid()) { + if (filterCustomerId != null && !filterCustomerId.equals(ctx.getCustomerId())) { + throw new SecurityException("Customer is not allowed to query other customer's data"); + } + filterCustomerId = ctx.getCustomerId(); + } + + ctx.addUuidParameter("permissions_tenant_id", ctx.getTenantId().getId()); + if (filterCustomerId != null) { + ctx.addUuidParameter("permissions_customer_id", filterCustomerId.getId()); + return "e.tenant_id=:permissions_tenant_id and e.entity_id=:permissions_customer_id"; + } else { + return "e.tenant_id=:permissions_tenant_id and e.entity_id=:permissions_tenant_id"; + } default: if (ctx.getEntityType() == EntityType.TENANT) { ctx.addUuidParameter("permissions_tenant_id", ctx.getTenantId().getId()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java index 46cc6eaa83..12702a4e0d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java @@ -33,8 +33,14 @@ public interface ApiUsageStateRepository extends CrudRepository { */ ApiUsageState findTenantApiUsageState(UUID tenantId); + ApiUsageState findApiUsageStateByEntityId(EntityId entityId); + /** * Delete usage record by tenantId. * * @param tenantId the tenantId */ void deleteApiUsageStateByTenantId(TenantId tenantId); + + void deleteApiUsageStateByEntityId(EntityId entityId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java index f263de1505..7f928d6c41 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -40,6 +41,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -68,12 +70,20 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A } @Override - public ApiUsageState createDefaultApiUsageState(TenantId tenantId) { - log.trace("Executing createDefaultUsageRecord [{}]", tenantId); + public void deleteApiUsageStateByEntityId(EntityId entityId) { + log.trace("Executing deleteApiUsageStateByEntityId [{}]", entityId); + validateId(entityId.getId(), "Invalid entity id"); + apiUsageStateDao.deleteApiUsageStateByEntityId(entityId); + } + + @Override + public ApiUsageState createDefaultApiUsageState(TenantId tenantId, EntityId entityId) { + entityId = Objects.requireNonNullElse(entityId, tenantId); + log.trace("Executing createDefaultUsageRecord [{}]", entityId); validateId(tenantId, INCORRECT_TENANT_ID + tenantId); ApiUsageState apiUsageState = new ApiUsageState(); apiUsageState.setTenantId(tenantId); - apiUsageState.setEntityId(tenantId); + apiUsageState.setEntityId(entityId); apiUsageState.setTransportState(ApiUsageStateValue.ENABLED); apiUsageState.setReExecState(ApiUsageStateValue.ENABLED); apiUsageState.setJsExecState(ApiUsageStateValue.ENABLED); @@ -84,9 +94,6 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A ApiUsageState saved = apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState); - Tenant tenant = tenantDao.findById(tenantId, tenantId.getId()); - TenantProfile tenantProfile = tenantProfileDao.findById(tenantId, tenant.getTenantProfileId().getId()); - TenantProfileConfiguration configuration = tenantProfile.getProfileData().getConfiguration(); List apiUsageStates = new ArrayList<>(); apiUsageStates.add(new BasicTsKvEntry(saved.getCreatedTime(), new StringDataEntry(ApiFeature.TRANSPORT.getApiStateKey(), ApiUsageStateValue.ENABLED.name()))); @@ -102,12 +109,19 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A new StringDataEntry(ApiFeature.SMS.getApiStateKey(), ApiUsageStateValue.ENABLED.name()))); tsService.save(tenantId, saved.getId(), apiUsageStates, 0L); - List profileThresholds = new ArrayList<>(); - - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - profileThresholds.add(new BasicTsKvEntry(saved.getCreatedTime(), new LongDataEntry(key.getApiLimitKey(), configuration.getProfileThreshold(key)))); + if (entityId.getEntityType() == EntityType.TENANT && !entityId.equals(TenantId.SYS_TENANT_ID)) { + tenantId = (TenantId) entityId; + Tenant tenant = tenantDao.findById(tenantId, tenantId.getId()); + TenantProfile tenantProfile = tenantProfileDao.findById(tenantId, tenant.getTenantProfileId().getId()); + TenantProfileConfiguration configuration = tenantProfile.getProfileData().getConfiguration(); + + List profileThresholds = new ArrayList<>(); + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + profileThresholds.add(new BasicTsKvEntry(saved.getCreatedTime(), new LongDataEntry(key.getApiLimitKey(), configuration.getProfileThreshold(key)))); + } + tsService.save(tenantId, saved.getId(), profileThresholds, 0L); } - tsService.save(tenantId, saved.getId(), profileThresholds, 0L); + return saved; } @@ -126,6 +140,12 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A return apiUsageStateDao.findTenantApiUsageState(tenantId.getId()); } + @Override + public ApiUsageState findApiUsageStateByEntityId(EntityId entityId) { + validateId(entityId.getId(), "Invalid entity id"); + return apiUsageStateDao.findApiUsageStateByEntityId(entityId); + } + @Override public ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id) { log.trace("Executing findApiUsageStateById, tenantId [{}], apiUsageStateId [{}]", tenantId, id); @@ -142,16 +162,14 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A throw new DataValidationException("ApiUsageState should be assigned to tenant!"); } else { Tenant tenant = tenantDao.findById(requestTenantId, apiUsageState.getTenantId().getId()); - if (tenant == null) { - throw new DataValidationException("Asset is referencing to non-existent tenant!"); + if (tenant == null && !requestTenantId.equals(TenantId.SYS_TENANT_ID)) { + throw new DataValidationException("ApiUsageState is referencing to non-existent tenant!"); } } if (apiUsageState.getEntityId() == null) { throw new DataValidationException("UsageRecord should be assigned to entity!"); - } else if (!EntityType.TENANT.equals(apiUsageState.getEntityId().getEntityType())) { - throw new DataValidationException("Only Tenant Usage Records are supported!"); - } else if (!apiUsageState.getTenantId().getId().equals(apiUsageState.getEntityId().getId())) { - throw new DataValidationException("Can't assign one Usage Record to multiple tenants!"); + } else if (apiUsageState.getEntityId().getEntityType() != EntityType.TENANT && apiUsageState.getEntityId().getEntityType() != EntityType.CUSTOMER) { + throw new DataValidationException("Only Tenant and Customer Usage Records are supported!"); } } }; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java index 7160c150f4..0b957c8b23 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.ApiFeature; import org.thingsboard.server.common.data.ApiUsageStateMailMessage; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import javax.mail.MessagingException; @@ -40,7 +41,7 @@ public interface MailService { void sendPasswordWasResetEmail(String loginLink, String email) throws ThingsboardException; - void send(TenantId tenantId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException; + void send(TenantId tenantId, CustomerId customerId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException; void sendAccountLockoutEmail( String lockoutEmail, String email, Integer maxFailedLoginAttempts) throws ThingsboardException; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 224a10f701..e641c83144 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -31,7 +32,7 @@ public interface RuleEngineTelemetryService { void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback); + void saveAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List ts, long ttl, FutureCallback callback); void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java index e0190177f0..caf96a68e6 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.api; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.sms.config.TestSmsRequest; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.TenantId; @@ -23,7 +24,7 @@ public interface SmsService { void updateSmsConfiguration(); - void sendSms(TenantId tenantId, String[] numbersTo, String message) throws ThingsboardException;; + void sendSms(TenantId tenantId, CustomerId customerId, String[] numbersTo, String message) throws ThingsboardException;; void sendTestSms(TestSmsRequest testSmsRequest) throws ThingsboardException; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index e4a71ee33d..7a0e3cce63 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -16,16 +16,15 @@ package org.thingsboard.rule.engine.api; import io.netty.channel.EventLoopGroup; -import org.springframework.data.redis.core.RedisTemplate; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; -import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; @@ -144,6 +143,8 @@ public interface TbContext { TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data); + TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data); + TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data); TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java index 8282c74159..60b25574d4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java @@ -136,7 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode { } private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) { - ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS); + ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS); } private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java index ca98e104c5..e7d018f65f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java @@ -63,7 +63,7 @@ public class TbMsgCountNode implements TbNode { TbMsgCountNodeConfiguration config = TbNodeUtils.convert(configuration, TbMsgCountNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getInterval()); this.telemetryPrefix = config.getTelemetryPrefix(); - scheduleTickMsg(ctx); + scheduleTickMsg(ctx, null); } @@ -78,23 +78,23 @@ public class TbMsgCountNode implements TbNode { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay)); - TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson)); + TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson)); ctx.enqueueForTellNext(tbMsg, SUCCESS); - scheduleTickMsg(ctx); + scheduleTickMsg(ctx, tbMsg); } else { messagesProcessed.incrementAndGet(); ctx.ack(msg); } } - private void scheduleTickMsg(TbContext ctx) { + private void scheduleTickMsg(TbContext ctx, TbMsg msg) { long curTs = System.currentTimeMillis(); if (lastScheduledTs == 0L) { lastScheduledTs = curTs; } lastScheduledTs = lastScheduledTs + delay; long curDelay = Math.max(0L, (lastScheduledTs - curTs)); - TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), msg != null ? msg.getCustomerId() : null, new TbMsgMetaData(), ""); nextTickId = tickMsg.getId(); ctx.tellSelf(tickMsg, curDelay); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index fc6ee959e0..b5a47b4a0d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -100,7 +100,7 @@ public class TbMsgGeneratorNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { - withCallback(generate(ctx), + withCallback(generate(ctx, msg), m -> { if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { ctx.enqueueForTellNext(m, SUCCESS); @@ -130,16 +130,16 @@ public class TbMsgGeneratorNode implements TbNode { ctx.tellSelf(tickMsg, curDelay); } - private ListenableFuture generate(TbContext ctx) { + private ListenableFuture generate(TbContext ctx, TbMsg msg) { return ctx.getJsExecutor().executeAsync(() -> { if (prevMsg == null) { - prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, new TbMsgMetaData(), "{}"); + prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); } if (initialized) { ctx.logJsEvalRequest(); TbMsg generated = jsEngine.executeGenerate(prevMsg); ctx.logJsEvalResponse(); - prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, generated.getMetaData(), generated.getData()); + prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); } return prevMsg; }); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 730cb734be..4a3113baca 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -70,7 +70,7 @@ public class TbMsgDelayNode implements TbNode { } else { if (pendingMsgs.size() < config.getMaxPendingMsgs()) { pendingMsgs.put(msg.getId(), msg); - TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString()); + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), msg.getCustomerId(), new TbMsgMetaData(), msg.getId().toString()); ctx.tellSelf(tickMsg, getDelay(msg)); ctx.ack(msg); } else { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java index 22aefd16f1..45e6efadb2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java @@ -26,7 +26,6 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -76,7 +75,7 @@ public class TbSendEmailNode implements TbNode { validateType(msg.getType()); EmailPojo email = getEmail(msg); withCallback(ctx.getMailExecutor().executeAsync(() -> { - sendEmail(ctx, email); + sendEmail(ctx, msg, email); return null; }), ok -> ctx.tellSuccess(msg), @@ -86,9 +85,9 @@ public class TbSendEmailNode implements TbNode { } } - private void sendEmail(TbContext ctx, EmailPojo email) throws Exception { + private void sendEmail(TbContext ctx, TbMsg msg, EmailPojo email) throws Exception { if (this.config.isUseSystemSmtpSettings()) { - ctx.getMailService().send(ctx.getTenantId(), email.getFrom(), email.getTo(), email.getCc(), + ctx.getMailService().send(ctx.getTenantId(), msg.getCustomerId(), email.getFrom(), email.getTo(), email.getCc(), email.getBcc(), email.getSubject(), email.getBody()); } else { MimeMessage mailMsg = mailSender.createMimeMessage(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java index ceee362fc8..445b9653a2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java @@ -74,15 +74,15 @@ class AlarmState { lastMsgMetaData = msg.getMetaData(); lastMsgQueueName = msg.getQueueName(); this.dataSnapshot = data; - return createOrClearAlarms(ctx, data, update, AlarmRuleState::eval); + return createOrClearAlarms(ctx, msg, data, update, AlarmRuleState::eval); } public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { initCurrentAlarm(ctx); - return createOrClearAlarms(ctx, ts, null, AlarmRuleState::eval); + return createOrClearAlarms(ctx, null, ts, null, AlarmRuleState::eval); } - public boolean createOrClearAlarms(TbContext ctx, T data, SnapshotUpdate update, BiFunction evalFunction) { + public boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction evalFunction) { boolean stateUpdate = false; AlarmRuleState resultState = null; log.debug("[{}] processing update: {}", alarmDefinition.getId(), data); @@ -103,7 +103,7 @@ class AlarmState { if (resultState != null) { TbAlarmResult result = calculateAlarmResult(ctx, resultState); if (result != null) { - pushMsg(ctx, result, resultState); + pushMsg(ctx, msg, result, resultState); } stateUpdate = clearAlarmState(stateUpdate, clearState); } else if (currentAlarm != null && clearState != null) { @@ -122,7 +122,7 @@ class AlarmState { ); DonAsynchron.withCallback(alarmClearOperationResult, result -> { - pushMsg(ctx, new TbAlarmResult(false, false, true, result.getAlarm()), clearState); + pushMsg(ctx, msg, new TbAlarmResult(false, false, true, result.getAlarm()), clearState); }, throwable -> { throw new RuntimeException(throwable); @@ -165,7 +165,7 @@ class AlarmState { } } - public void pushMsg(TbContext ctx, TbAlarmResult alarmResult, AlarmRuleState ruleState) { + public void pushMsg(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult, AlarmRuleState ruleState) { JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm()); String data = jsonNodes.toString(); TbMsgMetaData metaData = lastMsgMetaData != null ? lastMsgMetaData.copy() : new TbMsgMetaData(); @@ -185,7 +185,8 @@ class AlarmState { metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); } setAlarmConditionMetadata(ruleState, metaData); - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", originator, metaData, data); + TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", + originator, msg != null ? msg.getCustomerId() : null, metaData, data); ctx.tellNext(newMsg, relationType); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java index a733fd104a..ebf72d6b3a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java @@ -74,7 +74,7 @@ public class TbDeviceProfileNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class); this.cache = ctx.getDeviceProfileCache(); this.ctx = ctx; - scheduleAlarmHarvesting(ctx); + scheduleAlarmHarvesting(ctx, null); ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate); if (config.isFetchAlarmRulesStateOnStart()) { log.info("[{}] Fetching alarm rule state", ctx.getSelfId()); @@ -108,7 +108,7 @@ public class TbDeviceProfileNode implements TbNode { public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { EntityType originatorType = msg.getOriginator().getEntityType(); if (msg.getType().equals(PERIODIC_MSG_TYPE)) { - scheduleAlarmHarvesting(ctx); + scheduleAlarmHarvesting(ctx, msg); harvestAlarms(ctx, System.currentTimeMillis()); } else if (msg.getType().equals(PROFILE_UPDATE_MSG_TYPE)) { updateProfile(ctx, new DeviceProfileId(UUID.fromString(msg.getData()))); @@ -168,8 +168,8 @@ public class TbDeviceProfileNode implements TbNode { return deviceState; } - protected void scheduleAlarmHarvesting(TbContext ctx) { - TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, "{}"); + protected void scheduleAlarmHarvesting(TbContext ctx, TbMsg msg) { + TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, "{}"); ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1)); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 07952359a1..31bd913f47 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -16,10 +16,6 @@ package org.thingsboard.rule.engine.rpc; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -116,10 +112,10 @@ public class TbSendRPCRequestNode implements TbNode { ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); } else { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); } }); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java index df75afe086..22d293c69a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java @@ -76,7 +76,7 @@ public class TbSendSmsNode implements TbNode { String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg); String[] numbersToList = numbersTo.split(","); if (this.config.isUseSystemSmsSettings()) { - ctx.getSmsService().sendSms(ctx.getTenantId(), numbersToList, message); + ctx.getSmsService().sendSms(ctx.getTenantId(), msg.getCustomerId(), numbersToList, message); } else { for (String numberTo : numbersToList) { this.smsSender.sendSms(numberTo, message); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index dc657cfce6..c99a575cc3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -93,7 +94,7 @@ public class TbMsgTimeseriesNode implements TbNode { if (ttl == 0L) { ttl = tenantProfileDefaultStorageTtl; } - ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); + ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); } public static long getTs(TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java index 6a30eb63ab..faf934cd5e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java @@ -183,7 +183,7 @@ public class TbDeviceProfileNodeTest { Mockito.when(alarmService.createOrUpdateAlarm(Mockito.any())).thenAnswer(AdditionalAnswers.returnsFirstArg()); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(theMsg); + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); data.put("temperature", 42); @@ -195,7 +195,7 @@ public class TbDeviceProfileNodeTest { verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any()); TbMsg theMsg2 = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), "2"); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(theMsg2); + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(theMsg2); TbMsg msg2 = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(), @@ -274,7 +274,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(attrListListenableFuture); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); @@ -361,7 +361,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(attrListListenableFuture); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); @@ -430,7 +430,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(listListenableFutureWithLess); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); @@ -510,7 +510,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(optionalListenableFutureWithLess); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); @@ -584,7 +584,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(optionalListenableFutureWithLess); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); @@ -668,7 +668,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(optionalListenableFutureWithLess); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode(); @@ -754,7 +754,7 @@ public class TbDeviceProfileNodeTest { .thenReturn(optionalListenableFutureWithLess); TbMsg theMsg = TbMsg.newMsg("ALARM", deviceId, new TbMsgMetaData(), ""); - Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyString())) + Mockito.when(ctx.newMsg(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyString())) .thenReturn(theMsg); ObjectNode data = mapper.createObjectNode();