Browse Source

Draft of the review

pull/12487/head
Andrii Shvaika 1 year ago
parent
commit
7d8a76ce7f
  1. 10
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java
  2. 9
      application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java
  3. 20
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  4. 7
      common/proto/src/main/proto/queue.proto

10
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java

@ -122,16 +122,14 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
@Override
public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) {
List<CalculatedFieldLink> cfLinks = entityIdCalculatedFieldLinks.get(entityId);
if (cfLinks == null || cfLinks.isEmpty()) {
if (cfLinks == null) {
calculatedFieldFetchLock.lock();
try {
cfLinks = entityIdCalculatedFieldLinks.get(entityId);
if (cfLinks == null || cfLinks.isEmpty()) {
if (cfLinks == null) {
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId);
if (cfLinks != null) {
entityIdCalculatedFieldLinks.put(entityId, cfLinks);
log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks);
}
entityIdCalculatedFieldLinks.put(entityId, cfLinks);
log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks);
}
} finally {
calculatedFieldFetchLock.unlock();

9
application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java

@ -38,6 +38,7 @@ import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@ -99,7 +100,11 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList
}
protected <T> void addWsCallback(ListenableFuture<T> saveFuture, Consumer<T> callback) {
Futures.addCallback(saveFuture, new FutureCallback<T>() {
addCallback(saveFuture, callback, wsCallBackExecutor);
}
protected <T> void addCallback(ListenableFuture<T> saveFuture, Consumer<T> callback, Executor executor) {
Futures.addCallback(saveFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T result) {
callback.accept(result);
@ -108,7 +113,7 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList
@Override
public void onFailure(Throwable t) {
}
}, wsCallBackExecutor);
}, executor);
}
}

20
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -154,7 +154,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
if (request.isSaveLatest() && !request.isOnlyLatest()) {
addEntityViewCallback(tenantId, entityId, request.getEntries());
}
addCalculatedFieldCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())));
// Use something very similar to addMainCallback. don't forget about tsCallBackExecutor.
//CalculatedFieldTimeSeriesUpdateRequest - add constructor that accepts the TimeseriesSaveRequest
addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor);
return saveFuture;
}
@ -170,7 +172,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
ListenableFuture<List<Long>> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries());
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
addCalculatedFieldCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getPreviousCalculatedFieldIds())));
//CalculatedFieldAttributeUpdateRequest - add constructor that accepts the AttributesSaveRequest
addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor);
}
@Override
@ -343,17 +346,4 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
};
}
protected <T> void addCalculatedFieldCallback(ListenableFuture<T> saveFuture, Consumer<T> callback) {
Futures.addCallback(saveFuture, new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable T result) {
callback.accept(result);
}
@Override
public void onFailure(Throwable t) {
}
}, tsCallBackExecutor);
}
}

7
common/proto/src/main/proto/queue.proto

@ -809,6 +809,13 @@ message ProfileEntityMsgProto {
bool deleted = 10;
}
message ToServerB {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
repeated CfIdEntityIdPair links = 3;
value = 4;
}
message CalculatedFieldStateMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;

Loading…
Cancel
Save