|
|
|
@ -47,7 +47,7 @@ import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@Service |
|
|
|
@RequiredArgsConstructor |
|
|
|
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) |
|
|
|
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) // Queue type in mem or Kafka;
|
|
|
|
public class RocksDBCalculatedFieldStateService implements CalculatedFieldStateService { |
|
|
|
|
|
|
|
private final ActorSystemContext actorSystemContext; |
|
|
|
@ -66,12 +66,11 @@ public class RocksDBCalculatedFieldStateService implements CalculatedFieldStateS |
|
|
|
restoreStates().forEach((k, v) -> actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(k, v))); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { |
|
|
|
CalculatedFieldStateProto stateProto = toProto(stateId, state); |
|
|
|
long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes(); |
|
|
|
if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) { |
|
|
|
if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= maxStateSizeInKBytes) { |
|
|
|
rocksDBService.put(toProto(stateId), stateProto); |
|
|
|
} |
|
|
|
callback.onSuccess(); |
|
|
|
|