14 changed files with 652 additions and 300 deletions
@ -0,0 +1,315 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.subscription; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.id.UserId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.data.query.ComplexFilterPredicate; |
|||
import org.thingsboard.server.common.data.query.DynamicValue; |
|||
import org.thingsboard.server.common.data.query.DynamicValueSourceType; |
|||
import org.thingsboard.server.common.data.query.EntityCountQuery; |
|||
import org.thingsboard.server.common.data.query.EntityKeyType; |
|||
import org.thingsboard.server.common.data.query.FilterPredicateType; |
|||
import org.thingsboard.server.common.data.query.KeyFilter; |
|||
import org.thingsboard.server.common.data.query.KeyFilterPredicate; |
|||
import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate; |
|||
import org.thingsboard.server.common.data.query.TsValue; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.entity.EntityService; |
|||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; |
|||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; |
|||
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.ExecutionException; |
|||
import java.util.concurrent.ScheduledFuture; |
|||
|
|||
@Slf4j |
|||
@Data |
|||
public abstract class TbAbstractSubCtx<T extends EntityCountQuery> { |
|||
|
|||
protected final String serviceId; |
|||
protected final SubscriptionServiceStatistics stats; |
|||
protected final TelemetryWebSocketService wsService; |
|||
protected final EntityService entityService; |
|||
protected final TbLocalSubscriptionService localSubscriptionService; |
|||
protected final AttributesService attributesService; |
|||
protected final TelemetryWebSocketSessionRef sessionRef; |
|||
protected final int cmdId; |
|||
protected final Set<Integer> subToDynamicValueKeySet; |
|||
@Getter |
|||
protected final Map<DynamicValueKey, List<DynamicValue>> dynamicValues; |
|||
@Getter |
|||
@Setter |
|||
protected T query; |
|||
@Setter |
|||
protected volatile ScheduledFuture<?> refreshTask; |
|||
|
|||
public TbAbstractSubCtx(String serviceId, TelemetryWebSocketService wsService, |
|||
EntityService entityService, TbLocalSubscriptionService localSubscriptionService, |
|||
AttributesService attributesService, SubscriptionServiceStatistics stats, |
|||
TelemetryWebSocketSessionRef sessionRef, int cmdId) { |
|||
this.serviceId = serviceId; |
|||
this.wsService = wsService; |
|||
this.entityService = entityService; |
|||
this.localSubscriptionService = localSubscriptionService; |
|||
this.attributesService = attributesService; |
|||
this.stats = stats; |
|||
this.sessionRef = sessionRef; |
|||
this.cmdId = cmdId; |
|||
this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet(); |
|||
this.dynamicValues = new ConcurrentHashMap<>(); |
|||
} |
|||
|
|||
public void setAndResolveQuery(T query) { |
|||
dynamicValues.clear(); |
|||
this.query = query; |
|||
if (query != null && query.getKeyFilters() != null) { |
|||
for (KeyFilter filter : query.getKeyFilters()) { |
|||
registerDynamicValues(filter.getPredicate()); |
|||
} |
|||
} |
|||
resolve(getTenantId(), getCustomerId(), getUserId()); |
|||
} |
|||
|
|||
public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) { |
|||
List<ListenableFuture<DynamicValueKeySub>> futures = new ArrayList<>(); |
|||
for (DynamicValueKey key : dynamicValues.keySet()) { |
|||
switch (key.getSourceType()) { |
|||
case CURRENT_TENANT: |
|||
futures.add(resolveEntityValue(tenantId, tenantId, key)); |
|||
break; |
|||
case CURRENT_CUSTOMER: |
|||
if (customerId != null && !customerId.isNullUid()) { |
|||
futures.add(resolveEntityValue(tenantId, customerId, key)); |
|||
} |
|||
break; |
|||
case CURRENT_USER: |
|||
if (userId != null && !userId.isNullUid()) { |
|||
futures.add(resolveEntityValue(tenantId, userId, key)); |
|||
} |
|||
break; |
|||
} |
|||
} |
|||
try { |
|||
Map<EntityId, Map<String, DynamicValueKeySub>> tmpSubMap = new HashMap<>(); |
|||
for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) { |
|||
tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub); |
|||
} |
|||
for (EntityId entityId : tmpSubMap.keySet()) { |
|||
Map<String, Long> keyStates = new HashMap<>(); |
|||
Map<String, DynamicValueKeySub> dynamicValueKeySubMap = tmpSubMap.get(entityId); |
|||
dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs())); |
|||
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); |
|||
TbAttributeSubscription sub = TbAttributeSubscription.builder() |
|||
.serviceId(serviceId) |
|||
.sessionId(sessionRef.getSessionId()) |
|||
.subscriptionId(subIdx) |
|||
.tenantId(sessionRef.getSecurityCtx().getTenantId()) |
|||
.entityId(entityId) |
|||
.updateConsumer((s, subscriptionUpdate) -> dynamicValueSubUpdate(s, subscriptionUpdate, dynamicValueKeySubMap)) |
|||
.allKeys(false) |
|||
.keyStates(keyStates) |
|||
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE) |
|||
.build(); |
|||
subToDynamicValueKeySet.add(subIdx); |
|||
localSubscriptionService.addSubscription(sub); |
|||
} |
|||
} catch (InterruptedException | ExecutionException e) { |
|||
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet()); |
|||
} |
|||
|
|||
} |
|||
|
|||
private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, |
|||
Map<String, DynamicValueKeySub> dynamicValueKeySubMap) { |
|||
Map<String, TsValue> latestUpdate = new HashMap<>(); |
|||
subscriptionUpdate.getData().forEach((k, v) -> { |
|||
Object[] data = (Object[]) v.get(0); |
|||
latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); |
|||
}); |
|||
|
|||
boolean invalidateFilter = false; |
|||
for (Map.Entry<String, TsValue> entry : latestUpdate.entrySet()) { |
|||
String k = entry.getKey(); |
|||
TsValue tsValue = entry.getValue(); |
|||
DynamicValueKeySub sub = dynamicValueKeySubMap.get(k); |
|||
if (sub.updateValue(tsValue)) { |
|||
invalidateFilter = true; |
|||
updateDynamicValuesByKey(sub, tsValue); |
|||
} |
|||
} |
|||
|
|||
if (invalidateFilter) { |
|||
update(); |
|||
} |
|||
} |
|||
|
|||
public abstract void fetchData(); |
|||
|
|||
protected abstract void update(); |
|||
|
|||
public void clearSubscriptions() { |
|||
clearDynamicValueSubscriptions(); |
|||
} |
|||
|
|||
@Data |
|||
private static class DynamicValueKeySub { |
|||
private final DynamicValueKey key; |
|||
private final EntityId entityId; |
|||
private long lastUpdateTs; |
|||
private String lastUpdateValue; |
|||
|
|||
boolean updateValue(TsValue value) { |
|||
if (value.getTs() > lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) { |
|||
this.lastUpdateTs = value.getTs(); |
|||
this.lastUpdateValue = value.getValue(); |
|||
return true; |
|||
} else { |
|||
return false; |
|||
} |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<DynamicValueKeySub> resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) { |
|||
ListenableFuture<Optional<AttributeKvEntry>> entry = attributesService.find(tenantId, entityId, |
|||
TbAttributeSubscriptionScope.SERVER_SCOPE.name(), key.getSourceAttribute()); |
|||
return Futures.transform(entry, attributeOpt -> { |
|||
DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId); |
|||
if (attributeOpt.isPresent()) { |
|||
AttributeKvEntry attribute = attributeOpt.get(); |
|||
sub.setLastUpdateTs(attribute.getLastUpdateTs()); |
|||
sub.setLastUpdateValue(attribute.getValueAsString()); |
|||
updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString())); |
|||
} |
|||
return sub; |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
@SuppressWarnings("unchecked") |
|||
protected void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) { |
|||
DynamicValueKey dvk = sub.getKey(); |
|||
switch (dvk.getPredicateType()) { |
|||
case STRING: |
|||
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue())); |
|||
break; |
|||
case NUMERIC: |
|||
try { |
|||
Double dValue = Double.parseDouble(tsValue.getValue()); |
|||
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue)); |
|||
} catch (NumberFormatException e) { |
|||
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null)); |
|||
} |
|||
break; |
|||
case BOOLEAN: |
|||
Boolean bValue = Boolean.parseBoolean(tsValue.getValue()); |
|||
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue)); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
@SuppressWarnings("unchecked") |
|||
private void registerDynamicValues(KeyFilterPredicate predicate) { |
|||
switch (predicate.getType()) { |
|||
case STRING: |
|||
case NUMERIC: |
|||
case BOOLEAN: |
|||
Optional<DynamicValue> value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate); |
|||
if (value.isPresent()) { |
|||
DynamicValue dynamicValue = value.get(); |
|||
DynamicValueKey key = new DynamicValueKey( |
|||
predicate.getType(), |
|||
dynamicValue.getSourceType(), |
|||
dynamicValue.getSourceAttribute()); |
|||
dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue); |
|||
} |
|||
break; |
|||
case COMPLEX: |
|||
((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues); |
|||
} |
|||
} |
|||
|
|||
private Optional<DynamicValue<T>> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate<T> predicate) { |
|||
if (predicate.getValue().getUserValue() == null) { |
|||
return Optional.ofNullable(predicate.getValue().getDynamicValue()); |
|||
} else { |
|||
return Optional.empty(); |
|||
} |
|||
} |
|||
|
|||
public String getSessionId() { |
|||
return sessionRef.getSessionId(); |
|||
} |
|||
|
|||
public TenantId getTenantId() { |
|||
return sessionRef.getSecurityCtx().getTenantId(); |
|||
} |
|||
|
|||
public CustomerId getCustomerId() { |
|||
return sessionRef.getSecurityCtx().getCustomerId(); |
|||
} |
|||
|
|||
public UserId getUserId() { |
|||
return sessionRef.getSecurityCtx().getId(); |
|||
} |
|||
|
|||
protected void clearDynamicValueSubscriptions() { |
|||
if (subToDynamicValueKeySet != null) { |
|||
for (Integer subId : subToDynamicValueKeySet) { |
|||
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); |
|||
} |
|||
subToDynamicValueKeySet.clear(); |
|||
} |
|||
} |
|||
|
|||
public void setRefreshTask(ScheduledFuture<?> task) { |
|||
this.refreshTask = task; |
|||
} |
|||
|
|||
public void cancelTasks() { |
|||
if (this.refreshTask != null) { |
|||
log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId); |
|||
this.refreshTask.cancel(true); |
|||
} |
|||
} |
|||
|
|||
@Data |
|||
public static class DynamicValueKey { |
|||
@Getter |
|||
private final FilterPredicateType predicateType; |
|||
@Getter |
|||
private final DynamicValueSourceType sourceType; |
|||
@Getter |
|||
private final String sourceAttribute; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,55 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.subscription; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.data.query.EntityCountQuery; |
|||
import org.thingsboard.server.common.data.query.EntityKeyType; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.entity.EntityService; |
|||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; |
|||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; |
|||
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate; |
|||
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; |
|||
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; |
|||
|
|||
@Slf4j |
|||
public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> { |
|||
|
|||
private volatile int result; |
|||
|
|||
public TbEntityCountSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService, |
|||
TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, |
|||
SubscriptionServiceStatistics stats, TelemetryWebSocketSessionRef sessionRef, int cmdId) { |
|||
super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); |
|||
} |
|||
|
|||
@Override |
|||
public void fetchData() { |
|||
result = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query); |
|||
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result)); |
|||
} |
|||
|
|||
@Override |
|||
protected void update() { |
|||
int newCount = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query); |
|||
if (newCount != result) { |
|||
result = newCount; |
|||
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result)); |
|||
} |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue