|
|
|
@ -48,16 +48,22 @@ import org.thingsboard.server.common.data.objects.AttributesEntityView; |
|
|
|
import org.thingsboard.server.common.data.objects.TelemetryEntityView; |
|
|
|
import org.thingsboard.server.common.data.page.PageData; |
|
|
|
import org.thingsboard.server.common.data.page.PageLink; |
|
|
|
import org.thingsboard.server.common.data.query.DeviceTypeFilter; |
|
|
|
import org.thingsboard.server.common.data.query.EntityKey; |
|
|
|
import org.thingsboard.server.common.data.query.EntityKeyType; |
|
|
|
import org.thingsboard.server.common.data.query.EntityViewTypeFilter; |
|
|
|
import org.thingsboard.server.common.data.security.Authority; |
|
|
|
import org.thingsboard.server.common.data.security.DeviceCredentials; |
|
|
|
import org.thingsboard.server.dao.model.ModelConstants; |
|
|
|
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|
|
|
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static java.util.concurrent.TimeUnit.MILLISECONDS; |
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS; |
|
|
|
@ -71,12 +77,14 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; |
|
|
|
|
|
|
|
@TestPropertySource(properties = { |
|
|
|
"transport.mqtt.enabled=true", |
|
|
|
"js.evaluator=mock", |
|
|
|
}) |
|
|
|
@Slf4j |
|
|
|
public abstract class BaseEntityViewControllerTest extends AbstractControllerTest { |
|
|
|
static final int TIMEOUT = 30; |
|
|
|
static final TypeReference<PageData<EntityView>> PAGE_DATA_ENTITY_VIEW_TYPE_REF = new TypeReference<>() { |
|
|
|
}; |
|
|
|
static final TypeReference<PageData<EntityViewInfo>> PAGE_DATA_ENTITY_VIEW_INFO_TYPE_REF = new TypeReference<>() { |
|
|
|
}; |
|
|
|
|
|
|
|
private Tenant savedTenant; |
|
|
|
private User tenantAdmin; |
|
|
|
@ -86,13 +94,10 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
List<ListenableFuture<ResultActions>> deleteFutures = new ArrayList<>(); |
|
|
|
ListeningExecutorService executor; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
InMemoryStorage inMemoryStorage; |
|
|
|
|
|
|
|
@Before |
|
|
|
public void beforeTest() throws Exception { |
|
|
|
log.warn("beforeTest"); |
|
|
|
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(16, getClass())); |
|
|
|
log.debug("beforeTest"); |
|
|
|
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(8, getClass())); |
|
|
|
|
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
@ -108,7 +113,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); |
|
|
|
|
|
|
|
Device device = new Device(); |
|
|
|
device.setName("Test device"); |
|
|
|
device.setName("Test device 4view"); |
|
|
|
device.setType("default"); |
|
|
|
testDevice = doPost("/api/device", device, Device.class); |
|
|
|
|
|
|
|
@ -128,7 +133,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
|
|
|
|
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) |
|
|
|
.andExpect(status().isOk()); |
|
|
|
log.warn("after test"); |
|
|
|
log.debug("after test"); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
@ -304,9 +309,8 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
} |
|
|
|
Futures.allAsList(deleteFutures).get(TIMEOUT, SECONDS); |
|
|
|
|
|
|
|
PageData<EntityView> pageData = doGetTypedWithPageLink(urlTemplate, |
|
|
|
new TypeReference<PageData<EntityView>>() { |
|
|
|
}, new PageLink(4, 0, name1)); |
|
|
|
PageData<EntityView> pageData = doGetTypedWithPageLink(urlTemplate, PAGE_DATA_ENTITY_VIEW_TYPE_REF, |
|
|
|
new PageLink(4, 0, name1)); |
|
|
|
Assert.assertFalse(pageData.hasNext()); |
|
|
|
assertEquals(0, pageData.getData().size()); |
|
|
|
|
|
|
|
@ -317,8 +321,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
} |
|
|
|
Futures.allAsList(deleteFutures).get(TIMEOUT, SECONDS); |
|
|
|
|
|
|
|
pageData = doGetTypedWithPageLink(urlTemplate, new TypeReference<PageData<EntityView>>() { |
|
|
|
}, |
|
|
|
pageData = doGetTypedWithPageLink(urlTemplate, PAGE_DATA_ENTITY_VIEW_TYPE_REF, |
|
|
|
new PageLink(4, 0, name2)); |
|
|
|
Assert.assertFalse(pageData.hasNext()); |
|
|
|
assertEquals(0, pageData.getData().size()); |
|
|
|
@ -358,9 +361,8 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
} |
|
|
|
Futures.allAsList(deleteFutures).get(TIMEOUT, SECONDS); |
|
|
|
|
|
|
|
PageData<EntityView> pageData = doGetTypedWithPageLink("/api/tenant/entityViews?", |
|
|
|
new TypeReference<PageData<EntityView>>() { |
|
|
|
}, new PageLink(4, 0, name1)); |
|
|
|
PageData<EntityView> pageData = doGetTypedWithPageLink("/api/tenant/entityViews?", PAGE_DATA_ENTITY_VIEW_TYPE_REF, |
|
|
|
new PageLink(4, 0, name1)); |
|
|
|
Assert.assertFalse(pageData.hasNext()); |
|
|
|
assertEquals(0, pageData.getData().size()); |
|
|
|
|
|
|
|
@ -382,11 +384,10 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
Set<String> expectedActualAttributesSet = Set.of("caKey1", "caKey2", "caKey3", "caKey4"); |
|
|
|
Set<String> actualAttributesSet = |
|
|
|
getAttributesByKeys("{\"caKey1\":\"value1\", \"caKey2\":true, \"caKey3\":42.0, \"caKey4\":73}", expectedActualAttributesSet); |
|
|
|
|
|
|
|
log.warn("got correct actualAttributesSet, saving new entity view..."); |
|
|
|
log.debug("got correct actualAttributesSet, saving new entity view..."); |
|
|
|
EntityView savedView = getNewSavedEntityView("Test entity view"); |
|
|
|
|
|
|
|
log.warn("fetching entity view telemetry..."); |
|
|
|
log.debug("fetching entity view telemetry..."); |
|
|
|
List<Map<String, Object>> values = await("telemetry/ENTITY_VIEW") |
|
|
|
.atMost(TIMEOUT, SECONDS) |
|
|
|
.until(() -> doGetAsyncTyped("/api/plugins/telemetry/ENTITY_VIEW/" + savedView.getId().getId().toString() + |
|
|
|
@ -394,7 +395,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
}), |
|
|
|
x -> x.size() >= expectedActualAttributesSet.size()); |
|
|
|
|
|
|
|
log.warn("asserting..."); |
|
|
|
log.debug("asserting..."); |
|
|
|
assertEquals("value1", getValue(values, "caKey1")); |
|
|
|
assertEquals(true, getValue(values, "caKey2")); |
|
|
|
assertEquals(42.0, getValue(values, "caKey3")); |
|
|
|
@ -430,18 +431,40 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
|
|
|
|
@Test |
|
|
|
public void testGetTelemetryWhenEntityViewTimeRangeInsideTimestampRange() throws Exception { |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value1\", \"tsKey2\":true, \"tsKey3\":40.0}"); |
|
|
|
Thread.sleep(1000); |
|
|
|
DeviceTypeFilter dtf = new DeviceTypeFilter(testDevice.getType(), testDevice.getName()); |
|
|
|
List<String> tsKeys = List.of("tsKey1", "tsKey2", "tsKey3"); |
|
|
|
|
|
|
|
DeviceCredentials deviceCredentials =doGet("/api/device/" + testDevice.getId().getId() + "/credentials", DeviceCredentials.class); |
|
|
|
assertEquals(testDevice.getId(), deviceCredentials.getDeviceId()); |
|
|
|
String accessToken = deviceCredentials.getCredentialsId(); |
|
|
|
assertNotNull(accessToken); |
|
|
|
|
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
getWsClient().subscribeTsUpdate(tsKeys, now, TimeUnit.HOURS.toMillis(1), dtf); |
|
|
|
|
|
|
|
getWsClient().registerWaitForUpdate(); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value1\", \"tsKey2\":true, \"tsKey3\":40.0}", accessToken); |
|
|
|
getWsClient().waitForUpdate(); |
|
|
|
|
|
|
|
long startTimeMs = System.currentTimeMillis(); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value2\", \"tsKey2\":false, \"tsKey3\":80.0}"); |
|
|
|
Thread.sleep(1000); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value3\", \"tsKey2\":false, \"tsKey3\":120.0}"); |
|
|
|
|
|
|
|
getWsClient().registerWaitForUpdate(); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value2\", \"tsKey2\":false, \"tsKey3\":80.0}", accessToken); |
|
|
|
getWsClient().waitForUpdate(); |
|
|
|
|
|
|
|
Thread.sleep(3); |
|
|
|
|
|
|
|
getWsClient().registerWaitForUpdate(); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value3\", \"tsKey2\":false, \"tsKey3\":120.0}", accessToken); |
|
|
|
getWsClient().waitForUpdate(); |
|
|
|
|
|
|
|
long endTimeMs = System.currentTimeMillis(); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value4\", \"tsKey2\":true, \"tsKey3\":160.0}"); |
|
|
|
getWsClient().registerWaitForUpdate(); |
|
|
|
uploadTelemetry("{\"tsKey1\":\"value4\", \"tsKey2\":true, \"tsKey3\":160.0}", accessToken); |
|
|
|
getWsClient().waitForUpdate(); |
|
|
|
|
|
|
|
String deviceId = testDevice.getId().getId().toString(); |
|
|
|
Set<String> keys = getTelemetryKeys("DEVICE", deviceId); |
|
|
|
Thread.sleep(1000); |
|
|
|
|
|
|
|
EntityView view = createEntityView("Test entity view", startTimeMs, endTimeMs); |
|
|
|
EntityView savedView = doPost("/api/entityView", view, EntityView.class); |
|
|
|
@ -458,15 +481,8 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
Assert.assertEquals(1, actualValues.get("tsKey3").size()); |
|
|
|
} |
|
|
|
|
|
|
|
private void uploadTelemetry(String strKvs) throws Exception { |
|
|
|
|
|
|
|
private void uploadTelemetry(String strKvs, String accessToken) throws Exception { |
|
|
|
String viewDeviceId = testDevice.getId().getId().toString(); |
|
|
|
DeviceCredentials deviceCredentials = |
|
|
|
doGet("/api/device/" + viewDeviceId + "/credentials", DeviceCredentials.class); |
|
|
|
assertEquals(testDevice.getId(), deviceCredentials.getDeviceId()); |
|
|
|
|
|
|
|
String accessToken = deviceCredentials.getCredentialsId(); |
|
|
|
assertNotNull(accessToken); |
|
|
|
|
|
|
|
String clientId = MqttAsyncClient.generateClientId(); |
|
|
|
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:1883", clientId, new MemoryPersistence()); |
|
|
|
@ -478,7 +494,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
MqttMessage message = new MqttMessage(); |
|
|
|
message.setPayload(strKvs.getBytes()); |
|
|
|
IMqttDeliveryToken token = client.publish("v1/devices/me/telemetry", message); |
|
|
|
await("mqtt ack").pollInterval(10, MILLISECONDS).atMost(TIMEOUT, SECONDS).until(() -> token.getMessage() == null); |
|
|
|
await("mqtt ack").pollInterval(5, MILLISECONDS).atMost(TIMEOUT, SECONDS).until(() -> token.getMessage() == null); |
|
|
|
client.disconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
@ -504,7 +520,15 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
} |
|
|
|
|
|
|
|
private Set<String> getAttributesByKeys(String stringKV, Set<String> expectedKeySet) throws Exception { |
|
|
|
DeviceTypeFilter dtf = new DeviceTypeFilter(testDevice.getType(), testDevice.getName()); |
|
|
|
List<EntityKey> keysToSubscribe = expectedKeySet.stream() |
|
|
|
.map(key -> new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, key)) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
|
|
|
getWsClient().subscribeLatestUpdate(keysToSubscribe, dtf); |
|
|
|
|
|
|
|
String viewDeviceId = testDevice.getId().getId().toString(); |
|
|
|
log.debug("deviceid {}", viewDeviceId); |
|
|
|
DeviceCredentials deviceCredentials = |
|
|
|
doGet("/api/device/" + viewDeviceId + "/credentials", DeviceCredentials.class); |
|
|
|
assertEquals(testDevice.getId(), deviceCredentials.getDeviceId()); |
|
|
|
@ -512,6 +536,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
String accessToken = deviceCredentials.getCredentialsId(); |
|
|
|
assertNotNull(accessToken); |
|
|
|
|
|
|
|
log.debug("creating mqtt client..."); |
|
|
|
String clientId = MqttAsyncClient.generateClientId(); |
|
|
|
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:1883", clientId, new MemoryPersistence()); |
|
|
|
|
|
|
|
@ -519,21 +544,16 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
options.setUserName(accessToken); |
|
|
|
client.connect(options); |
|
|
|
awaitConnected(client, SECONDS.toMillis(30)); |
|
|
|
|
|
|
|
log.debug("mqtt connected..."); |
|
|
|
MqttMessage message = new MqttMessage(); |
|
|
|
message.setPayload((stringKV).getBytes()); |
|
|
|
getWsClient().registerWaitForUpdate(); |
|
|
|
IMqttDeliveryToken token = client.publish("v1/devices/me/attributes", message); |
|
|
|
log.warn("token.message {}", token.getMessage()); |
|
|
|
await("mqtt ack").pollInterval(10, MILLISECONDS).atMost(TIMEOUT, SECONDS).until(() -> token.getMessage() == null); |
|
|
|
log.warn("token.message delivered {}", token.getMessage()); |
|
|
|
client.disconnect(); |
|
|
|
|
|
|
|
return await().pollInterval(20, MILLISECONDS).atMost(TIMEOUT, SECONDS) |
|
|
|
.until(() -> { |
|
|
|
inMemoryStorage.printStats(); |
|
|
|
return getAttributeKeys("DEVICE", viewDeviceId); |
|
|
|
}, |
|
|
|
keys -> keys.containsAll(expectedKeySet)); |
|
|
|
log.debug("publish token.message {}", token.getMessage()); |
|
|
|
await("mqtt ack").pollInterval(5, MILLISECONDS).atMost(TIMEOUT, SECONDS).until(() -> token.getMessage() == null); |
|
|
|
log.debug("token.message delivered {}", token.getMessage()); |
|
|
|
assertThat(getWsClient().waitForUpdate()).as("ws update received").isNotBlank(); |
|
|
|
return getAttributeKeys("DEVICE", viewDeviceId); |
|
|
|
} |
|
|
|
|
|
|
|
private Object getValue(List<Map<String, Object>> values, String stringValue) { |
|
|
|
@ -624,8 +644,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
|
|
|
List<EntityViewInfo> loadedItems = new ArrayList<>(); |
|
|
|
PageData<EntityViewInfo> pageData; |
|
|
|
do { |
|
|
|
pageData = doGetTypedWithPageLink(urlTemplate, new TypeReference<PageData<EntityViewInfo>>() { |
|
|
|
}, pageLink); |
|
|
|
pageData = doGetTypedWithPageLink(urlTemplate, PAGE_DATA_ENTITY_VIEW_INFO_TYPE_REF, pageLink); |
|
|
|
loadedItems.addAll(pageData.getData()); |
|
|
|
if (pageData.hasNext()) { |
|
|
|
pageLink = pageLink.nextPageLink(); |
|
|
|
|