Browse Source

Refactoring for AbstractCalculatedFieldProcessingService

pull/14441/head
Viacheslav Klimov 6 months ago
parent
commit
9dfc4da49b
  1. 92
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

92
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import jakarta.annotation.PostConstruct;
@ -428,37 +427,21 @@ public abstract class AbstractCalculatedFieldProcessingService {
JsonElement jsonResult = JsonParser.parseString(Objects.requireNonNull(cfResult.stringValue()));
log.trace("[{}][{}] Saving CF result: {}", tenantId, entityId, jsonResult);
SettableFuture<Void> future = SettableFuture.create();
switch (type) {
case ATTRIBUTES -> saveAttributes(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfResult.getScope(), cfResult.getCalculatedFieldName(), cfIds, future);
case TIME_SERIES -> saveTimeSeries(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfIds, System.currentTimeMillis(), future);
case ATTRIBUTES -> saveAttributes(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfResult.getScope(), cfResult.getCalculatedFieldName(), cfIds, callback);
case TIME_SERIES -> saveTimeSeries(tenantId, entityId, jsonResult, cfResult.getOutputStrategy(), cfIds, System.currentTimeMillis(), callback);
}
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Void v) {
callback.onSuccess();
log.debug("[{}][{}] Saved CF result: {}", tenantId, entityId, cfResult);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
log.error("[{}][{}] Failed to save CF result {}", tenantId, entityId, cfResult, t);
}
}, MoreExecutors.directExecutor());
}
private void saveAttributes(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, AttributeScope scope, String cfName, List<CalculatedFieldId> cfIds, SettableFuture<Void> future) {
private void saveAttributes(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, AttributeScope scope, String cfName, List<CalculatedFieldId> cfIds, TbCallback callback) {
if (!(outputStrategy instanceof AttributesImmediateOutputStrategy attOutputStrategy)) {
future.setException(new IllegalArgumentException("Only AttributeImmediateOutputStrategy is supported."));
callback.onFailure(new IllegalArgumentException("Only AttributeImmediateOutputStrategy is supported."));
} else {
AttributesSaveRequest.Strategy strategy = new Strategy(attOutputStrategy.isSaveAttribute(), attOutputStrategy.isSendWsUpdate(), attOutputStrategy.isProcessCfs());
List<AttributeKvEntry> newAttributes = JsonConverter.convertToAttributes(jsonResult);
if (!attOutputStrategy.isUpdateAttributesOnlyOnValueChange()) {
saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, newAttributes, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), future);
saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, newAttributes, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), callback);
return;
}
@ -469,12 +452,12 @@ public abstract class AbstractCalculatedFieldProcessingService {
existingAttributes -> {
List<AttributeKvEntry> changed = filterChangedAttr(existingAttributes, newAttributes);
if (changed.isEmpty()) {
future.set(null);
callback.onSuccess();
return;
}
saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, changed, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), future);
saveAttributesInternal(tenantId, entityId, scope, cfName, cfIds, changed, strategy, attOutputStrategy.isSendAttributesUpdatedNotification(), callback);
},
future::setException,
callback::onFailure,
MoreExecutors.directExecutor());
}
}
@ -486,10 +469,7 @@ public abstract class AbstractCalculatedFieldProcessingService {
List<AttributeKvEntry> entries,
AttributesSaveRequest.Strategy strategy,
boolean sendAttributesUpdatedNotification,
SettableFuture<Void> future) {
Runnable onSuccess = sendAttributesUpdatedNotification
? () -> sendAttributesUpdatedMsg(tenantId, entityId, scope, cfName, entries)
: null;
TbCallback callback) {
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
@ -497,23 +477,36 @@ public abstract class AbstractCalculatedFieldProcessingService {
.entries(entries)
.strategy(strategy)
.previousCalculatedFieldIds(cfIds)
.callback(wrapWithSuccessHandler(future, onSuccess))
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
if (sendAttributesUpdatedNotification) {
sendAttributesUpdatedMsg(tenantId, entityId, scope, cfName, entries);
}
callback.onSuccess();
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
})
.build());
}
private void saveTimeSeries(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, List<CalculatedFieldId> cfIds, long ts, SettableFuture<Void> future) {
private void saveTimeSeries(TenantId tenantId, EntityId entityId, JsonElement jsonResult, OutputStrategy outputStrategy, List<CalculatedFieldId> cfIds, long ts, TbCallback callback) {
if (!(outputStrategy instanceof TimeSeriesImmediateOutputStrategy tsOutputStrategy)) {
future.setException(new IllegalArgumentException("Only TimeSeriesImmediateOutputStrategy is supported."));
callback.onFailure(new IllegalArgumentException("Only TimeSeriesImmediateOutputStrategy is supported."));
} else {
TimeseriesSaveRequest.Strategy strategy = new TimeseriesSaveRequest.Strategy(tsOutputStrategy.isSaveTimeSeries(), tsOutputStrategy.isSaveLatest(), tsOutputStrategy.isSendWsUpdate(), tsOutputStrategy.isProcessCfs());
saveTimeSeriesInternal(tenantId, entityId, jsonResult, tsOutputStrategy.getTtl(), cfIds, ts, strategy, future);
saveTimeSeriesInternal(tenantId, entityId, jsonResult, tsOutputStrategy.getTtl(), cfIds, ts, strategy, callback);
}
}
private void saveTimeSeriesInternal(TenantId tenantId, EntityId entityId, JsonElement jsonResult, Long ttl, List<CalculatedFieldId> cfIds, long ts, TimeseriesSaveRequest.Strategy strategy, SettableFuture<Void> future) {
private void saveTimeSeriesInternal(TenantId tenantId, EntityId entityId, JsonElement jsonResult, Long ttl, List<CalculatedFieldId> cfIds, long ts, TimeseriesSaveRequest.Strategy strategy, TbCallback callback) {
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(jsonResult, ts);
if (tsKvMap.isEmpty()) {
future.set(null);
callback.onSuccess();
return;
}
List<TsKvEntry> tsEntries = toTsKvEntryList(tsKvMap);
@ -522,7 +515,17 @@ public abstract class AbstractCalculatedFieldProcessingService {
.entityId(entityId)
.entries(tsEntries)
.strategy(strategy)
.future(future);
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
callback.onSuccess();
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
if (ttl != null) {
builder.ttl(ttl);
}
@ -554,21 +557,4 @@ public abstract class AbstractCalculatedFieldProcessingService {
sendMsgToRuleEngine(tenantId, entityId, TbCallback.EMPTY, attributesUpdatedMsg);
}
private FutureCallback<Void> wrapWithSuccessHandler(SettableFuture<Void> future, Runnable onSuccess) {
return new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
future.set(result);
if (onSuccess != null) {
onSuccess.run();
}
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
};
}
}

Loading…
Cancel
Save