Browse Source

added check for update values for ts rolling when rocksdb fails

pull/12266/head
IrynaMatveieva 1 year ago
parent
commit
befd4cc9c6
  1. 8
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java
  2. 13
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java
  3. 1
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

8
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java

@ -100,11 +100,11 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
@Override
public List<CalculatedFieldLink> getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
List<CalculatedFieldLink> cfLinks = calculatedFieldLinks.get(calculatedFieldId);
if (cfLinks == null) {
if (cfLinks == null || cfLinks.isEmpty()) {
calculatedFieldFetchLock.lock();
try {
cfLinks = calculatedFieldLinks.get(calculatedFieldId);
if (cfLinks == null) {
if (cfLinks == null || cfLinks.isEmpty()) {
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksById(tenantId, calculatedFieldId);
if (cfLinks != null) {
calculatedFieldLinks.put(calculatedFieldId, cfLinks);
@ -122,11 +122,11 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
@Override
public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) {
List<CalculatedFieldLink> cfLinks = entityIdCalculatedFieldLinks.get(entityId);
if (cfLinks == null) {
if (cfLinks == null || cfLinks.isEmpty()) {
calculatedFieldFetchLock.lock();
try {
cfLinks = entityIdCalculatedFieldLinks.get(entityId);
if (cfLinks == null) {
if (cfLinks == null || cfLinks.isEmpty()) {
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId);
if (cfLinks != null) {
entityIdCalculatedFieldLinks.put(entityId, cfLinks);

13
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java

@ -556,20 +556,21 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
CalculatedFieldState state = calculatedFieldEntityCtx.getState();
boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet());
if (!allKeysPresent) {
boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream()
.anyMatch(argument -> "TS_ROLLING".equals(argument.getType()) && state.getArguments().get(argument.getKey()) == null);
if (!allKeysPresent || requiresTsRollingUpdate) {
Map<String, Argument> missingArguments = calculatedFieldCtx.getArguments().entrySet().stream()
.filter(entry -> !argumentValues.containsKey(entry.getKey()))
.filter(entry -> !argumentValues.containsKey(entry.getKey()) || ("TS_ROLLING".equals(entry.getValue().getType()) && state.getArguments().get(entry.getKey()) == null))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll)
.addListener(() -> performUpdateState.accept(state),
calculatedFieldCallbackExecutor);
return;
} else {
performUpdateState.accept(state);
}
performUpdateState.accept(state);
states.put(entityCtxId, calculatedFieldEntityCtx);
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
} else {
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, calculatedFieldIds, argumentValues);
}

1
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -24,6 +24,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected Map<String, ArgumentEntry> arguments;
public BaseCalculatedFieldState() {
arguments = new HashMap<>();
}
@Override

Loading…
Cancel
Save