Browse Source

minor fixes

pull/13040/head
IrynaMatveieva 1 year ago
parent
commit
48d9aca0ab
  1. 2
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  2. 6
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java
  3. 12
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  4. 3
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  5. 8
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java
  6. 5
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java
  7. 4
      application/src/main/resources/thingsboard.yml
  8. 4
      application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java
  9. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java
  10. 2
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  11. 4
      common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitProfileEntityMsg.java
  12. 1
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java
  13. 3
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  14. 3
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
  15. 6
      dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java

2
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -114,7 +114,7 @@ public class AppActor extends ContextAwareActor {
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
break;
case CF_CACHE_INIT_MSG:
case CF_PROFILE_ENTITY_MSG:
case CF_INIT_PROFILE_ENTITY_MSG:
case CF_INIT_MSG:
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:

6
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java

@ -25,9 +25,9 @@ import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitProfileEntityMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldProfileEntityMsg;
/**
* Created by ashvayka on 15.03.18.
@ -70,8 +70,8 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor {
case CF_CACHE_INIT_MSG:
processor.onCacheInitMsg((CalculatedFieldCacheInitMsg) msg);
break;
case CF_PROFILE_ENTITY_MSG:
processor.onProfileEntityMsg((CalculatedFieldProfileEntityMsg) msg);
case CF_INIT_PROFILE_ENTITY_MSG:
processor.onProfileEntityMsg((CalculatedFieldInitProfileEntityMsg) msg);
break;
case CF_INIT_MSG:
processor.onFieldInitMsg((CalculatedFieldInitMsg) msg);

12
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -38,9 +38,9 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.cf.CalculatedFieldCacheInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitProfileEntityMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldProfileEntityMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
@ -86,7 +86,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
protected TbActorCtx ctx;
@Value("${calculated_fields.init_fetch_pack_size:50000}")
@Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}")
@Getter
private int initFetchPackSize;
@ -123,7 +123,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
msg.getCallback().onSuccess();
}
public void onProfileEntityMsg(CalculatedFieldProfileEntityMsg msg) {
public void onProfileEntityMsg(CalculatedFieldInitProfileEntityMsg msg) {
log.debug("[{}] Processing profile entity message.", msg.getTenantId().getId());
entityProfileCache.add(msg.getProfileEntityId(), msg.getEntityId());
msg.getCallback().onSuccess();
@ -538,7 +538,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
return switch (entityId.getEntityType()) {
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId();
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
default -> null;
default ->
throw new IllegalArgumentException("'" + entityId.getEntityType() + "' is not profile entity." + entityId);
};
}
@ -567,10 +568,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void initCalculatedFields() {
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), initFetchPackSize);
cfs.forEach(cf -> {
log.trace("Processing calculated field record: {}", cf);
try {
onFieldInitMsg(new CalculatedFieldInitMsg(cf.getTenantId(), cf));
} catch (CalculatedFieldException e) {
throw new RuntimeException(e);
log.error("Failed to process calculated field record: {}", cf, e);
}
});
calculatedFields.values().forEach(cf -> {

3
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -178,7 +178,7 @@ public class TenantActor extends RuleChainManagerActor {
onRuleChainMsg((RuleChainAwareMsg) msg);
break;
case CF_CACHE_INIT_MSG:
case CF_PROFILE_ENTITY_MSG:
case CF_INIT_PROFILE_ENTITY_MSG:
case CF_INIT_MSG:
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:
@ -202,6 +202,7 @@ public class TenantActor extends RuleChainManagerActor {
} else {
log.debug("[{}] CF Actor is not initialized. ToCalculatedFieldSystemMsg: [{}]", tenantId, msg);
}
msg.getCallback().onSuccess();
return;
}
if (priority) {

8
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java

@ -23,7 +23,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.ProfileEntityIdInfo;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.cf.CalculatedFieldProfileEntityMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitProfileEntityMsg;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.queue.util.AfterStartUp;
@ -39,7 +39,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer
private final DeviceService deviceService;
private final ActorSystemContext actorSystemContext;
@Value("${calculated_fields.init_fetch_pack_size:50000}")
@Value("${queue.calculated_fields.init_fetch_pack_size:50000}")
@Getter
private int initFetchPackSize;
@ -49,7 +49,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer
for (ProfileEntityIdInfo idInfo : deviceIdInfos) {
log.trace("Processing device record: {}", idInfo);
try {
actorSystemContext.tell(new CalculatedFieldProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()));
actorSystemContext.tell(new CalculatedFieldInitProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()));
} catch (Exception e) {
log.error("Failed to process device record: {}", idInfo, e);
}
@ -58,7 +58,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer
for (ProfileEntityIdInfo idInfo : assetIdInfos) {
log.trace("Processing asset record: {}", idInfo);
try {
actorSystemContext.tell(new CalculatedFieldProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()));
actorSystemContext.tell(new CalculatedFieldInitProfileEntityMsg(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()));
} catch (Exception e) {
log.error("Failed to process asset record: {}", idInfo, e);
}

5
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java

@ -131,6 +131,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
stateService.restore(queueKey, partitions);
}
});
// eventConsumer's partitions will be updated by stateService
// Cleanup old entities after corresponding consumers are stopped.
// Any periodic tasks need to check that the entity is still managed by the current server before processing.
@ -174,9 +175,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa
packSubmitFuture.cancel(true);
log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
}
if (log.isDebugEnabled()) {
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
}
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
consumer.commit();

4
application/src/main/resources/thingsboard.yml

@ -1848,6 +1848,10 @@ queue:
pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}"
# RocksDB path for storing CF states
rocks_db_path: "${TB_QUEUE_CF_ROCKS_DB_PATH:${user.home}/.rocksdb/cf_states}"
# The fetch size specifies how many rows will be returned
init_fetch_pack_size: "${TB_QUEUE_CF_FETCH_PACK_SIZE:50000}"
# The fetch size specifies how many rows will be returned
init_tenant_fetch_pack_size: "${TB_QUEUE_CF_TENANT_FETCH_PACK_SIZE:1000}"
transport:
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

4
application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java

@ -24,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@ -51,6 +52,8 @@ public class DefaultTbCoreConsumerServiceTest {
private TbCoreConsumerStats statsMock;
@Mock
private RuleEngineCallService ruleEngineCallServiceMock;
@Mock
private Logger logMock;
@Mock
private TbCallback tbCallbackMock;
@ -69,6 +72,7 @@ public class DefaultTbCoreConsumerServiceTest {
executor = MoreExecutors.newDirectExecutorService();
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stateService", stateServiceMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "deviceActivityEventsExecutor", executor);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "log", logMock);
}
@AfterEach

2
common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java

@ -55,8 +55,6 @@ public interface CalculatedFieldService extends EntityDaoService {
List<CalculatedFieldLink> findAllCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId);
List<CalculatedFieldLink> findAllCalculatedFieldLinksByTenantId(TenantId tenantId);
PageData<CalculatedFieldLink> findAllCalculatedFieldLinksByTenantId(TenantId tenantId, PageLink pageLink);
PageData<CalculatedFieldLink> findAllCalculatedFieldLinks(PageLink pageLink);

2
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -137,7 +137,7 @@ public enum MsgType {
CF_CACHE_INIT_MSG, // Sent to init caches for CF actor;
CF_PROFILE_ENTITY_MSG, // Sent to init profile entities cache;
CF_INIT_PROFILE_ENTITY_MSG, // Sent to init profile entities cache;
CF_INIT_MSG, // Sent to init particular calculated field;
CF_LINK_INIT_MSG, // Sent to init particular calculated field;
CF_STATE_RESTORE_MSG, // Sent to restore particular calculated field entity state;

4
common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldProfileEntityMsg.java → common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitProfileEntityMsg.java

@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
@Data
public class CalculatedFieldProfileEntityMsg implements ToCalculatedFieldSystemMsg {
public class CalculatedFieldInitProfileEntityMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final EntityId profileEntityId;
@ -30,7 +30,7 @@ public class CalculatedFieldProfileEntityMsg implements ToCalculatedFieldSystemM
@Override
public MsgType getMsgType() {
return MsgType.CF_PROFILE_ENTITY_MSG;
return MsgType.CF_INIT_PROFILE_ENTITY_MSG;
}
}

1
common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java

@ -18,7 +18,6 @@ package org.thingsboard.server.queue.discovery.event;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.QueueKey;

3
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -517,6 +517,9 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TopicPartitionInfo tpi) {
String queueName = DataConstants.CF_QUEUE_NAME;
if (tpi == null) {
throw new IllegalArgumentException("TopicPartitionInfo is required.");
}
TenantId tenantId = tpi.getTenantId().orElse(TenantId.SYS_TENANT_ID);
Integer partitionId = tpi.getPartition().orElseThrow(() -> new IllegalArgumentException("PartitionId is required."));
String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);

3
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -318,6 +318,9 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TopicPartitionInfo tpi) {
String queueName = DataConstants.CF_QUEUE_NAME;
if (tpi == null) {
throw new IllegalArgumentException("TopicPartitionInfo is required.");
}
TenantId tenantId = tpi.getTenantId().orElse(TenantId.SYS_TENANT_ID);
Integer partitionId = tpi.getPartition().orElseThrow(() -> new IllegalArgumentException("PartitionId is required."));
String groupId = topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);

6
dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java

@ -182,12 +182,6 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
return calculatedFieldLinkDao.findCalculatedFieldLinksByEntityId(tenantId, entityId);
}
@Override
public List<CalculatedFieldLink> findAllCalculatedFieldLinksByTenantId(TenantId tenantId) {
log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId [{}]", tenantId);
return calculatedFieldLinkDao.findCalculatedFieldLinksByTenantId(tenantId);
}
@Override
public PageData<CalculatedFieldLink> findAllCalculatedFieldLinksByTenantId(TenantId tenantId, PageLink pageLink) {
log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId[{}] pageLink [{}]", tenantId, pageLink);

Loading…
Cancel
Save