|
Before Width: | Height: | Size: 113 KiB After Width: | Height: | Size: 111 KiB |
|
Before Width: | Height: | Size: 139 KiB After Width: | Height: | Size: 137 KiB |
|
Before Width: | Height: | Size: 50 KiB After Width: | Height: | Size: 49 KiB |
|
Before Width: | Height: | Size: 115 KiB After Width: | Height: | Size: 113 KiB |
|
Before Width: | Height: | Size: 113 KiB After Width: | Height: | Size: 111 KiB |
|
Before Width: | Height: | Size: 120 KiB After Width: | Height: | Size: 118 KiB |
|
Before Width: | Height: | Size: 121 KiB After Width: | Height: | Size: 119 KiB |
|
Before Width: | Height: | Size: 114 KiB After Width: | Height: | Size: 112 KiB |
|
Before Width: | Height: | Size: 6.9 KiB After Width: | Height: | Size: 6.9 KiB |
|
Before Width: | Height: | Size: 7.0 KiB After Width: | Height: | Size: 7.0 KiB |
|
Before Width: | Height: | Size: 48 KiB After Width: | Height: | Size: 48 KiB |
|
Before Width: | Height: | Size: 7.1 KiB After Width: | Height: | Size: 7.1 KiB |
|
Before Width: | Height: | Size: 51 KiB After Width: | Height: | Size: 50 KiB |
|
Before Width: | Height: | Size: 101 KiB After Width: | Height: | Size: 99 KiB |
|
Before Width: | Height: | Size: 105 KiB After Width: | Height: | Size: 104 KiB |
|
Before Width: | Height: | Size: 118 KiB After Width: | Height: | Size: 117 KiB |
|
Before Width: | Height: | Size: 120 KiB After Width: | Height: | Size: 118 KiB |
|
Before Width: | Height: | Size: 122 KiB After Width: | Height: | Size: 120 KiB |
|
Before Width: | Height: | Size: 108 KiB After Width: | Height: | Size: 107 KiB |
|
Before Width: | Height: | Size: 120 KiB After Width: | Height: | Size: 119 KiB |
|
Before Width: | Height: | Size: 101 KiB After Width: | Height: | Size: 100 KiB |
|
Before Width: | Height: | Size: 50 KiB After Width: | Height: | Size: 50 KiB |
|
Before Width: | Height: | Size: 113 KiB After Width: | Height: | Size: 112 KiB |
@ -0,0 +1,74 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors.service; |
|||
|
|||
import org.junit.jupiter.api.Assertions; |
|||
import org.junit.jupiter.api.BeforeEach; |
|||
import org.junit.jupiter.api.Test; |
|||
import org.mockito.BDDMockito; |
|||
import org.mockito.Mockito; |
|||
import org.springframework.test.util.ReflectionTestUtils; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.msg.TbActorStopReason; |
|||
|
|||
import java.util.concurrent.ScheduledFuture; |
|||
|
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.anyLong; |
|||
|
|||
class ComponentActorTest { |
|||
ComponentActor componentActor; |
|||
|
|||
@BeforeEach |
|||
void setUp() { |
|||
componentActor = Mockito.mock(ComponentActor.class); |
|||
} |
|||
|
|||
@Test |
|||
void scheduleStatsPersistTickTest() { |
|||
Assertions.assertNull(componentActor.statsScheduledFuture); |
|||
ScheduledFuture<?> statsScheduledFuture = Mockito.mock(ScheduledFuture.class); |
|||
ActorSystemContext systemContext = Mockito.mock(ActorSystemContext.class); |
|||
ReflectionTestUtils.setField(componentActor, "systemContext", systemContext); |
|||
ComponentMsgProcessor<?> processor = Mockito.mock(ComponentMsgProcessor.class); |
|||
componentActor.processor = processor; |
|||
BDDMockito.willReturn(statsScheduledFuture).given(processor).scheduleStatsPersistTick(any(), anyLong()); |
|||
BDDMockito.willCallRealMethod().given(componentActor).scheduleStatsPersistTick(); |
|||
|
|||
componentActor.scheduleStatsPersistTick(); |
|||
|
|||
Assertions.assertNotNull(componentActor.statsScheduledFuture); |
|||
} |
|||
|
|||
@Test |
|||
void destroyTest() { |
|||
ScheduledFuture<?> statsScheduledFuture = Mockito.mock(ScheduledFuture.class); |
|||
componentActor.statsScheduledFuture = statsScheduledFuture; |
|||
Assertions.assertNotNull(componentActor.statsScheduledFuture); |
|||
Throwable cause = new Throwable(); |
|||
EntityId id = Mockito.mock(EntityId.class); |
|||
ReflectionTestUtils.setField(componentActor, "id", id); |
|||
BDDMockito.willCallRealMethod().given(componentActor).destroy(any(), any()); |
|||
|
|||
componentActor.destroy(TbActorStopReason.STOPPED, cause); |
|||
|
|||
Mockito.verify(statsScheduledFuture).cancel(false); |
|||
Assertions.assertNull(componentActor.statsScheduledFuture); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,130 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.jdbc.core.JdbcTemplate; |
|||
import org.springframework.test.context.TestPropertySource; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.AttributeScope; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.edqs.EdqsSyncRequest; |
|||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.query.DeviceTypeFilter; |
|||
import org.thingsboard.server.common.data.query.EntityData; |
|||
import org.thingsboard.server.common.data.query.EntityDataPageLink; |
|||
import org.thingsboard.server.common.data.query.EntityDataQuery; |
|||
import org.thingsboard.server.common.data.query.EntityKey; |
|||
import org.thingsboard.server.common.data.query.EntityKeyType; |
|||
import org.thingsboard.server.common.data.query.EntityKeyValueType; |
|||
import org.thingsboard.server.common.data.query.FilterPredicateValue; |
|||
import org.thingsboard.server.common.data.query.KeyFilter; |
|||
import org.thingsboard.server.common.data.query.StringFilterPredicate; |
|||
import org.thingsboard.server.dao.service.DaoSqlTest; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.awaitility.Awaitility.await; |
|||
|
|||
@DaoSqlTest |
|||
@TestPropertySource(properties = { |
|||
"queue.edqs.sync.enabled=true", |
|||
"queue.edqs.api.supported=true", |
|||
"queue.edqs.api.auto_enable=true", |
|||
"queue.edqs.mode=local" |
|||
}) |
|||
public class EdqsControllerTest extends AbstractControllerTest { |
|||
|
|||
@Autowired |
|||
private JdbcTemplate jdbcTemplate; |
|||
|
|||
@Before |
|||
public void beforeEdqsControllerTest() throws Exception { |
|||
loginTenantAdmin(); |
|||
} |
|||
|
|||
@Test |
|||
public void testEdqsSync() throws Exception { |
|||
List<Device> devices = new ArrayList<>(); |
|||
for (int i = 0; i < 3; i++) { |
|||
Device device = new Device(); |
|||
device.setName("Device" + i); |
|||
device.setType("default"); |
|||
device.setLabel("testLabel" + (int) (Math.random() * 1000)); |
|||
ObjectNode additionalInfo = JacksonUtil.newObjectNode(); |
|||
additionalInfo.put("gateway", true); |
|||
device.setAdditionalInfo(additionalInfo); |
|||
devices.add(doPost("/api/device", device, Device.class)); |
|||
Thread.sleep(1); |
|||
} |
|||
|
|||
DeviceTypeFilter filter = new DeviceTypeFilter(); |
|||
filter.setDeviceTypes(List.of("default")); |
|||
filter.setDeviceNameFilter(""); |
|||
|
|||
List<EntityKey> entityFields = Collections.singletonList(new EntityKey(EntityKeyType.ENTITY_FIELD, "name")); |
|||
|
|||
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null); |
|||
EntityDataQuery query = new EntityDataQuery(filter, pageLink, entityFields, null, Collections.singletonList(getGatewayFilter())); |
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS) |
|||
.until(() -> doPostWithTypedResponse("/api/entitiesQuery/find", query, new TypeReference<PageData<EntityData>>() { |
|||
}), result -> result.getTotalElements() == 3); |
|||
|
|||
// update db
|
|||
Device device1 = devices.get(0); |
|||
device1.setAdditionalInfo(JacksonUtil.newObjectNode()); |
|||
jdbcTemplate.execute("update device set additional_info = '{}' where id = '" + device1.getId().getId().toString() + "'"); |
|||
|
|||
// do edqs sync
|
|||
loginSysAdmin(); |
|||
ToCoreEdqsRequest syncRequest = new ToCoreEdqsRequest(new EdqsSyncRequest(), null); |
|||
doPost("/api/edqs/system/request", syncRequest); |
|||
|
|||
//check sync is finished
|
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> { |
|||
Optional<AttributeKvEntry> attribute = attributesService.find(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, AttributeScope.SERVER_SCOPE, "edqsSyncState").get(); |
|||
return attribute.isPresent() && attribute.get().getJsonValue().isPresent() && |
|||
attribute.get().getJsonValue().get().contains("\"status\":\"FINISHED\""); |
|||
}); |
|||
|
|||
// check if the count is updated
|
|||
loginTenantAdmin(); |
|||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> doPostWithTypedResponse("/api/entitiesQuery/find", query, new TypeReference<PageData<EntityData>>() { |
|||
}), result -> result.getTotalElements() == 2); |
|||
} |
|||
|
|||
private KeyFilter getGatewayFilter() { |
|||
KeyFilter additionalInfoFilter = new KeyFilter(); |
|||
additionalInfoFilter.setKey(new EntityKey(EntityKeyType.ENTITY_FIELD, "additionalInfo")); |
|||
additionalInfoFilter.setValueType(EntityKeyValueType.STRING); |
|||
StringFilterPredicate predicate = new StringFilterPredicate(); |
|||
predicate.setValue(FilterPredicateValue.fromString("\"gateway\":true")); |
|||
predicate.setOperation(StringFilterPredicate.StringOperation.CONTAINS); |
|||
additionalInfoFilter.setPredicate(predicate); |
|||
return additionalInfoFilter; |
|||
} |
|||
} |
|||
@ -1,98 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2025 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.common.consumer; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.queue.TbQueueMsg; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.HashSet; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.locks.ReadWriteLock; |
|||
import java.util.concurrent.locks.ReentrantReadWriteLock; |
|||
|
|||
import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTopic; |
|||
|
|||
@Slf4j |
|||
public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> { |
|||
|
|||
private PartitionedQueueConsumerManager<S> stateConsumer; |
|||
private PartitionedQueueConsumerManager<E> eventConsumer; |
|||
|
|||
@Getter |
|||
private Set<TopicPartitionInfo> partitions; |
|||
private final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet(); |
|||
private boolean initialized; |
|||
|
|||
private final ReadWriteLock partitionsLock = new ReentrantReadWriteLock(); |
|||
|
|||
public void init(PartitionedQueueConsumerManager<S> stateConsumer, PartitionedQueueConsumerManager<E> eventConsumer) { |
|||
this.stateConsumer = stateConsumer; |
|||
this.eventConsumer = eventConsumer; |
|||
} |
|||
|
|||
public void update(Set<TopicPartitionInfo> newPartitions) { |
|||
newPartitions = withTopic(newPartitions, stateConsumer.getTopic()); |
|||
var writeLock = partitionsLock.writeLock(); |
|||
writeLock.lock(); |
|||
Set<TopicPartitionInfo> oldPartitions = this.partitions != null ? this.partitions : Collections.emptySet(); |
|||
Set<TopicPartitionInfo> addedPartitions; |
|||
Set<TopicPartitionInfo> removedPartitions; |
|||
try { |
|||
addedPartitions = new HashSet<>(newPartitions); |
|||
addedPartitions.removeAll(oldPartitions); |
|||
removedPartitions = new HashSet<>(oldPartitions); |
|||
removedPartitions.removeAll(newPartitions); |
|||
this.partitions = newPartitions; |
|||
} finally { |
|||
writeLock.unlock(); |
|||
} |
|||
|
|||
if (!removedPartitions.isEmpty()) { |
|||
stateConsumer.removePartitions(removedPartitions); |
|||
eventConsumer.removePartitions(withTopic(removedPartitions, eventConsumer.getTopic())); |
|||
} |
|||
|
|||
if (!addedPartitions.isEmpty()) { |
|||
partitionsInProgress.addAll(addedPartitions); |
|||
stateConsumer.addPartitions(addedPartitions, partition -> { |
|||
var readLock = partitionsLock.readLock(); |
|||
readLock.lock(); |
|||
try { |
|||
partitionsInProgress.remove(partition); |
|||
log.info("Finished partition {} (still in progress: {})", partition, partitionsInProgress); |
|||
if (partitionsInProgress.isEmpty()) { |
|||
log.info("All partitions processed"); |
|||
} |
|||
if (this.partitions.contains(partition)) { |
|||
eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic()))); |
|||
} |
|||
} finally { |
|||
readLock.unlock(); |
|||
} |
|||
}); |
|||
} |
|||
initialized = true; |
|||
} |
|||
|
|||
public Set<TopicPartitionInfo> getPartitionsInProgress() { |
|||
return initialized ? partitionsInProgress : null; |
|||
} |
|||
|
|||
} |
|||