196 changed files with 3464 additions and 5429 deletions
File diff suppressed because one or more lines are too long
@ -0,0 +1,257 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.actors; |
|||
|
|||
import org.junit.jupiter.api.Test; |
|||
import org.junit.jupiter.api.extension.ExtendWith; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.boot.context.properties.EnableConfigurationProperties; |
|||
import org.springframework.boot.test.mock.mockito.MockBean; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.test.context.ContextConfiguration; |
|||
import org.springframework.test.context.TestPropertySource; |
|||
import org.springframework.test.context.junit.jupiter.SpringExtension; |
|||
import org.thingsboard.rule.engine.api.MailService; |
|||
import org.thingsboard.rule.engine.api.SmsService; |
|||
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; |
|||
import org.thingsboard.server.actors.service.ActorService; |
|||
import org.thingsboard.server.cluster.TbClusterService; |
|||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.attributes.AttributesService; |
|||
import org.thingsboard.server.dao.audit.AuditLogService; |
|||
import org.thingsboard.server.dao.cassandra.CassandraCluster; |
|||
import org.thingsboard.server.dao.customer.CustomerService; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.device.ClaimDevicesService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.edge.EdgeEventService; |
|||
import org.thingsboard.server.dao.edge.EdgeService; |
|||
import org.thingsboard.server.dao.entityview.EntityViewService; |
|||
import org.thingsboard.server.dao.event.EventService; |
|||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; |
|||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; |
|||
import org.thingsboard.server.dao.ota.OtaPackageService; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.resource.ResourceService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.rule.RuleNodeStateService; |
|||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; |
|||
import org.thingsboard.server.dao.tenant.TenantProfileService; |
|||
import org.thingsboard.server.dao.tenant.TenantService; |
|||
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|||
import org.thingsboard.server.dao.user.UserService; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.usagestats.TbApiUsageClient; |
|||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; |
|||
import org.thingsboard.server.service.component.ComponentDiscoveryService; |
|||
import org.thingsboard.server.service.edge.rpc.EdgeRpcService; |
|||
import org.thingsboard.server.service.executors.DbCallbackExecutorService; |
|||
import org.thingsboard.server.service.executors.ExternalCallExecutorService; |
|||
import org.thingsboard.server.service.executors.SharedEventLoopGroupService; |
|||
import org.thingsboard.server.service.mail.MailExecutorService; |
|||
import org.thingsboard.server.service.profile.TbDeviceProfileCache; |
|||
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; |
|||
import org.thingsboard.server.service.rpc.TbRpcService; |
|||
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; |
|||
import org.thingsboard.server.service.script.JsInvokeService; |
|||
import org.thingsboard.server.service.session.DeviceSessionCacheService; |
|||
import org.thingsboard.server.service.sms.SmsExecutorService; |
|||
import org.thingsboard.server.service.state.DeviceStateService; |
|||
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService; |
|||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|||
import org.thingsboard.server.service.transport.TbCoreToTransportService; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
@ExtendWith(SpringExtension.class) |
|||
@ContextConfiguration(classes = ActorSystemContext.class) |
|||
@EnableConfigurationProperties |
|||
@TestPropertySource(properties = { |
|||
"cache.type=caffeine", |
|||
}) |
|||
public class ActorSystemContextTest { |
|||
|
|||
@Autowired |
|||
ActorSystemContext ctx; |
|||
|
|||
@MockBean |
|||
private TbApiUsageStateService apiUsageStateService; |
|||
|
|||
@MockBean |
|||
private TbApiUsageClient apiUsageClient; |
|||
|
|||
@MockBean |
|||
private TbServiceInfoProvider serviceInfoProvider; |
|||
|
|||
@MockBean |
|||
private ActorService actorService; |
|||
|
|||
@MockBean |
|||
private ComponentDiscoveryService componentService; |
|||
|
|||
@MockBean |
|||
private DataDecodingEncodingService encodingService; |
|||
|
|||
@MockBean |
|||
private DeviceService deviceService; |
|||
|
|||
@MockBean |
|||
private TbTenantProfileCache tenantProfileCache; |
|||
|
|||
@MockBean |
|||
private TbDeviceProfileCache deviceProfileCache; |
|||
|
|||
@MockBean |
|||
private AssetService assetService; |
|||
|
|||
@MockBean |
|||
private DashboardService dashboardService; |
|||
|
|||
@MockBean |
|||
private TenantService tenantService; |
|||
|
|||
@MockBean |
|||
private TenantProfileService tenantProfileService; |
|||
|
|||
@MockBean |
|||
private CustomerService customerService; |
|||
|
|||
@MockBean |
|||
private UserService userService; |
|||
|
|||
@MockBean |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@MockBean |
|||
private RuleNodeStateService ruleNodeStateService; |
|||
|
|||
@MockBean |
|||
private PartitionService partitionService; |
|||
|
|||
@MockBean |
|||
private TbClusterService clusterService; |
|||
|
|||
@MockBean |
|||
private TimeseriesService tsService; |
|||
|
|||
@MockBean |
|||
private AttributesService attributesService; |
|||
|
|||
@MockBean |
|||
private EventService eventService; |
|||
|
|||
@MockBean |
|||
private RelationService relationService; |
|||
|
|||
@MockBean |
|||
private AuditLogService auditLogService; |
|||
|
|||
@MockBean |
|||
private EntityViewService entityViewService; |
|||
|
|||
@MockBean |
|||
private TelemetrySubscriptionService tsSubService; |
|||
|
|||
@MockBean |
|||
private AlarmSubscriptionService alarmService; |
|||
|
|||
@MockBean |
|||
private JsInvokeService jsSandbox; |
|||
|
|||
@MockBean |
|||
private MailExecutorService mailExecutor; |
|||
|
|||
@MockBean |
|||
private SmsExecutorService smsExecutor; |
|||
|
|||
@MockBean |
|||
private DbCallbackExecutorService dbCallbackExecutor; |
|||
|
|||
@MockBean |
|||
private ExternalCallExecutorService externalCallExecutorService; |
|||
|
|||
@MockBean |
|||
private SharedEventLoopGroupService sharedEventLoopGroupService; |
|||
|
|||
@MockBean |
|||
private MailService mailService; |
|||
|
|||
@MockBean |
|||
private SmsService smsService; |
|||
|
|||
@MockBean |
|||
private SmsSenderFactory smsSenderFactory; |
|||
|
|||
@MockBean |
|||
private ClaimDevicesService claimDevicesService; |
|||
|
|||
@MockBean |
|||
private JsInvokeStats jsInvokeStats; |
|||
|
|||
@MockBean |
|||
private DeviceStateService deviceStateService; |
|||
|
|||
@MockBean |
|||
private DeviceSessionCacheService deviceSessionCacheService; |
|||
|
|||
@MockBean |
|||
private TbCoreToTransportService tbCoreToTransportService; |
|||
|
|||
@MockBean |
|||
private TbRuleEngineDeviceRpcService tbRuleEngineDeviceRpcService; |
|||
|
|||
@MockBean |
|||
private TbCoreDeviceRpcService tbCoreDeviceRpcService; |
|||
|
|||
@MockBean |
|||
private EdgeService edgeService; |
|||
|
|||
@MockBean |
|||
private EdgeEventService edgeEventService; |
|||
|
|||
@MockBean |
|||
private EdgeRpcService edgeRpcService; |
|||
|
|||
@MockBean |
|||
private ResourceService resourceService; |
|||
|
|||
@MockBean |
|||
private OtaPackageService otaPackageService; |
|||
|
|||
@MockBean |
|||
private TbRpcService tbRpcService; |
|||
|
|||
@MockBean |
|||
private CassandraCluster cassandraCluster; |
|||
|
|||
@MockBean |
|||
private CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor; |
|||
|
|||
@MockBean |
|||
private CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor; |
|||
|
|||
@MockBean |
|||
private RedisTemplate<String, Object> redisTemplate; |
|||
|
|||
@Test |
|||
void givenCaffeineCache_whenInit_thenIsLocalCacheTrue() { |
|||
assertThat(ctx.getCacheType()).isEqualTo("caffeine"); |
|||
assertThat(ctx.isLocalCacheType()).as("caffeine is the local cache type").isTrue(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,42 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|||
|
|||
@Slf4j |
|||
public abstract class AbstractInMemoryStorageTest { |
|||
|
|||
@Before |
|||
public void setUpInMemoryStorage() { |
|||
log.info("set up InMemoryStorage"); |
|||
cleanupInMemStorage(); |
|||
} |
|||
|
|||
@After |
|||
public void tearDownInMemoryStorage() { |
|||
log.info("tear down InMemoryStorage"); |
|||
cleanupInMemStorage(); |
|||
} |
|||
|
|||
public static void cleanupInMemStorage() { |
|||
InMemoryStorage.getInstance().cleanup(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,196 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; |
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.node.ObjectNode; |
|||
import org.junit.After; |
|||
import org.junit.Assert; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.springframework.test.web.servlet.MvcResult; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.common.data.*; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.rpc.Rpc; |
|||
import org.thingsboard.server.common.data.rpc.RpcStatus; |
|||
import org.thingsboard.server.common.data.security.Authority; |
|||
|
|||
import java.util.List; |
|||
|
|||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
|||
|
|||
public abstract class BaseRpcControllerTest extends AbstractControllerTest { |
|||
|
|||
private Tenant savedTenant; |
|||
private User tenantAdmin; |
|||
|
|||
@Before |
|||
public void beforeTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
Tenant tenant = new Tenant(); |
|||
tenant.setTitle("My tenant"); |
|||
savedTenant = doPost("/api/tenant", tenant, Tenant.class); |
|||
Assert.assertNotNull(savedTenant); |
|||
|
|||
tenantAdmin = new User(); |
|||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN); |
|||
tenantAdmin.setTenantId(savedTenant.getId()); |
|||
tenantAdmin.setEmail("tenant2@thingsboard.org"); |
|||
tenantAdmin.setFirstName("Joe"); |
|||
tenantAdmin.setLastName("Downs"); |
|||
|
|||
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1"); |
|||
} |
|||
|
|||
@After |
|||
public void afterTest() throws Exception { |
|||
loginSysAdmin(); |
|||
|
|||
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) |
|||
.andExpect(status().isOk()); |
|||
} |
|||
|
|||
private Device createDefaultDevice() { |
|||
Device device = new Device(); |
|||
device.setName("My device"); |
|||
device.setType("default"); |
|||
|
|||
return device; |
|||
} |
|||
|
|||
private ObjectNode createDefaultRpc() { |
|||
ObjectNode rpc = JacksonUtil.newObjectNode(); |
|||
rpc.put("method", "setGpio"); |
|||
|
|||
ObjectNode params = JacksonUtil.newObjectNode(); |
|||
|
|||
params.put("pin", 7); |
|||
params.put("value", 1); |
|||
|
|||
rpc.set("params", params); |
|||
rpc.put("persistent", true); |
|||
rpc.put("timeout", 5000); |
|||
|
|||
return rpc; |
|||
} |
|||
|
|||
private Rpc getRpcById(String rpcId) throws Exception { |
|||
return doGet("/api/rpc/persistent/" + rpcId, Rpc.class); |
|||
} |
|||
|
|||
private MvcResult removeRpcById(String rpcId) throws Exception { |
|||
return doDelete("/api/rpc/persistent/" + rpcId).andReturn(); |
|||
} |
|||
|
|||
@Test |
|||
public void testSaveRpc() throws Exception { |
|||
Device device = createDefaultDevice(); |
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
|
|||
ObjectNode rpc = createDefaultRpc(); |
|||
String result = doPostAsync( |
|||
"/api/rpc/oneway/" + savedDevice.getId().getId().toString(), |
|||
JacksonUtil.toString(rpc), |
|||
String.class, |
|||
status().isOk() |
|||
); |
|||
String rpcId = JacksonUtil.fromString(result, JsonNode.class) |
|||
.get("rpcId") |
|||
.asText(); |
|||
Rpc savedRpc = getRpcById(rpcId); |
|||
|
|||
Assert.assertNotNull(savedRpc); |
|||
Assert.assertEquals(savedDevice.getId(), savedRpc.getDeviceId()); |
|||
} |
|||
|
|||
@Test |
|||
public void testDeleteRpc() throws Exception { |
|||
Device device = createDefaultDevice(); |
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
|
|||
ObjectNode rpc = createDefaultRpc(); |
|||
String result = doPostAsync( |
|||
"/api/rpc/oneway/" + savedDevice.getId().getId().toString(), |
|||
JacksonUtil.toString(rpc), |
|||
String.class, |
|||
status().isOk() |
|||
); |
|||
String rpcId = JacksonUtil.fromString(result, JsonNode.class) |
|||
.get("rpcId") |
|||
.asText(); |
|||
Rpc savedRpc = getRpcById(rpcId); |
|||
|
|||
MvcResult mvcResult = removeRpcById(savedRpc.getId().getId().toString()); |
|||
MvcResult res = doGet("/api/rpc/persistent/" + rpcId) |
|||
.andExpect(status().isNotFound()) |
|||
.andReturn(); |
|||
|
|||
JsonNode deleteResponse = JacksonUtil.fromString(res.getResponse().getContentAsString(), JsonNode.class); |
|||
Assert.assertEquals(404, deleteResponse.get("status").asInt()); |
|||
|
|||
String url = "/api/rpc/persistent/device/" + savedDevice.getUuidId().toString() |
|||
+ "?" + "page=0" + "&" + |
|||
"pageSize=" + Integer.MAX_VALUE + "&" + |
|||
"rpcStatus=" + RpcStatus.DELETED.name(); |
|||
MvcResult byDeviceResult = doGet(url).andReturn(); |
|||
JsonNode byDeviceResponse = JacksonUtil.fromString(byDeviceResult.getResponse().getContentAsString(), JsonNode.class); |
|||
|
|||
Assert.assertEquals(500, byDeviceResponse.get("status").asInt()); |
|||
} |
|||
|
|||
@Test |
|||
public void testGetRpcsByDeviceId() throws Exception { |
|||
Device device = createDefaultDevice(); |
|||
Device savedDevice = doPost("/api/device", device, Device.class); |
|||
|
|||
ObjectNode rpc = createDefaultRpc(); |
|||
|
|||
String result = doPostAsync( |
|||
"/api/rpc/oneway/" + savedDevice.getId().getId().toString(), |
|||
JacksonUtil.toString(rpc), |
|||
String.class, |
|||
status().isOk() |
|||
); |
|||
String rpcId = JacksonUtil.fromString(result, JsonNode.class) |
|||
.get("rpcId") |
|||
.asText(); |
|||
|
|||
String url = "/api/rpc/persistent/device/" + savedDevice.getId().getId() |
|||
+ "?" + "page=0" + "&" + |
|||
"pageSize=" + Integer.MAX_VALUE + "&" + |
|||
"rpcStatus=" + RpcStatus.QUEUED; |
|||
|
|||
MvcResult byDeviceResult = doGetAsync(url).andReturn(); |
|||
|
|||
List<Rpc> byDeviceRpcs = JacksonUtil.fromString( |
|||
byDeviceResult |
|||
.getResponse() |
|||
.getContentAsString(), |
|||
new TypeReference<PageData<Rpc>>() {} |
|||
).getData(); |
|||
|
|||
|
|||
boolean found = byDeviceRpcs.stream().anyMatch(r -> |
|||
r.getUuidId().toString().equals(rpcId) |
|||
&& r.getDeviceId().equals(savedDevice.getId()) |
|||
); |
|||
|
|||
Assert.assertTrue(found); |
|||
} |
|||
} |
|||
@ -0,0 +1,54 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.queue.memory; |
|||
|
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.thingsboard.server.queue.TbQueueMsg; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.mockito.Mockito.mock; |
|||
|
|||
public class InMemoryStorageTest { |
|||
|
|||
InMemoryStorage storage = InMemoryStorage.getInstance(); |
|||
|
|||
@Before |
|||
public void setUp() { |
|||
storage.cleanup(); |
|||
} |
|||
|
|||
@After |
|||
public void tearDown() { |
|||
storage.cleanup(); |
|||
} |
|||
|
|||
@Test |
|||
public void givenStorage_whenGetLagTotal_thenReturnInteger() throws InterruptedException { |
|||
assertThat(storage.getLagTotal()).isEqualTo(0); |
|||
storage.put("main", mock(TbQueueMsg.class)); |
|||
assertThat(storage.getLagTotal()).isEqualTo(1); |
|||
storage.put("main", mock(TbQueueMsg.class)); |
|||
assertThat(storage.getLagTotal()).isEqualTo(2); |
|||
storage.put("hp", mock(TbQueueMsg.class)); |
|||
assertThat(storage.getLagTotal()).isEqualTo(3); |
|||
storage.get("main"); |
|||
assertThat(storage.getLagTotal()).isEqualTo(1); |
|||
storage.cleanup(); |
|||
assertThat(storage.getLagTotal()).isEqualTo(0); |
|||
} |
|||
} |
|||
@ -0,0 +1,92 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.model; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.ToString; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.data.device.profile.lwm2m.ObjectAttributes; |
|||
|
|||
import java.util.HashSet; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
|
|||
import static org.thingsboard.common.util.CollectionsUtil.diffSets; |
|||
|
|||
@Data |
|||
@NoArgsConstructor |
|||
@ToString(exclude = "toCancelRead") |
|||
@Slf4j |
|||
public class LwM2MModelConfig { |
|||
private String endpoint; |
|||
private Map<String, ObjectAttributes> attributesToAdd; |
|||
private Set<String> attributesToRemove; |
|||
private Set<String> toObserve; |
|||
private Set<String> toCancelObserve; |
|||
private Set<String> toRead; |
|||
@JsonIgnore |
|||
private Set<String> toCancelRead; |
|||
|
|||
public LwM2MModelConfig(String endpoint) { |
|||
this.endpoint = endpoint; |
|||
this.attributesToAdd = new ConcurrentHashMap<>(); |
|||
this.attributesToRemove = ConcurrentHashMap.newKeySet(); |
|||
this.toObserve = ConcurrentHashMap.newKeySet(); |
|||
this.toCancelObserve = ConcurrentHashMap.newKeySet(); |
|||
this.toRead = ConcurrentHashMap.newKeySet(); |
|||
this.toCancelRead = new HashSet<>(); |
|||
} |
|||
|
|||
public void merge(LwM2MModelConfig modelConfig) { |
|||
if (modelConfig.isEmpty() && modelConfig.getToCancelRead().isEmpty()) { |
|||
return; |
|||
} |
|||
|
|||
modelConfig.getAttributesToAdd().forEach((k, v) -> { |
|||
if (this.attributesToRemove.contains(k)) { |
|||
this.attributesToRemove.remove(k); |
|||
} else { |
|||
this.attributesToAdd.put(k, v); |
|||
} |
|||
}); |
|||
|
|||
modelConfig.getAttributesToRemove().forEach(k -> { |
|||
if (this.attributesToAdd.containsKey(k)) { |
|||
this.attributesToRemove.remove(k); |
|||
} else { |
|||
this.attributesToRemove.add(k); |
|||
} |
|||
}); |
|||
|
|||
this.toObserve.addAll(diffSets(this.toCancelObserve, modelConfig.getToObserve())); |
|||
this.toCancelObserve.addAll(diffSets(this.toObserve, modelConfig.getToCancelObserve())); |
|||
|
|||
this.toObserve.removeAll(modelConfig.getToCancelObserve()); |
|||
this.toCancelObserve.removeAll(modelConfig.getToObserve()); |
|||
|
|||
this.toRead.removeAll(modelConfig.getToObserve()); |
|||
this.toRead.removeAll(modelConfig.getToCancelRead()); |
|||
this.toRead.addAll(modelConfig.getToRead()); |
|||
} |
|||
|
|||
@JsonIgnore |
|||
public boolean isEmpty() { |
|||
return attributesToAdd.isEmpty() && toObserve.isEmpty() && toCancelObserve.isEmpty() && toRead.isEmpty(); |
|||
} |
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.model; |
|||
|
|||
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; |
|||
|
|||
public interface LwM2MModelConfigService { |
|||
|
|||
void sendUpdates(LwM2mClient lwM2mClient); |
|||
|
|||
void sendUpdates(LwM2mClient lwM2mClient, LwM2MModelConfig modelConfig); |
|||
|
|||
void persistUpdates(String endpoint); |
|||
|
|||
void removeUpdates(String endpoint); |
|||
} |
|||
@ -0,0 +1,235 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.model; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Lazy; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.device.profile.lwm2m.ObjectAttributes; |
|||
import org.thingsboard.server.queue.util.AfterStartUp; |
|||
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; |
|||
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; |
|||
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveRequest; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveCallback; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveRequest; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadCallback; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback; |
|||
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest; |
|||
import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; |
|||
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore; |
|||
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; |
|||
|
|||
import javax.annotation.PreDestroy; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.TimeoutException; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@TbLwM2mTransportComponent |
|||
public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService { |
|||
|
|||
@Autowired |
|||
private TbLwM2MModelConfigStore modelStore; |
|||
|
|||
@Autowired |
|||
@Lazy |
|||
private LwM2mDownlinkMsgHandler downlinkMsgHandler; |
|||
@Autowired |
|||
@Lazy |
|||
private LwM2mUplinkMsgHandler uplinkMsgHandler; |
|||
@Autowired |
|||
@Lazy |
|||
private LwM2mClientContext clientContext; |
|||
|
|||
@Autowired |
|||
private LwM2MTelemetryLogService logService; |
|||
|
|||
private ConcurrentMap<String, LwM2MModelConfig> currentModelConfigs; |
|||
|
|||
@AfterStartUp(order = Integer.MAX_VALUE - 1) |
|||
private void init() { |
|||
List<LwM2MModelConfig> models = modelStore.getAll(); |
|||
log.debug("Fetched model configs: {}", models); |
|||
currentModelConfigs = models.stream() |
|||
.collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m)); |
|||
} |
|||
|
|||
@Override |
|||
public void sendUpdates(LwM2mClient lwM2mClient) { |
|||
LwM2MModelConfig modelConfig = currentModelConfigs.get(lwM2mClient.getEndpoint()); |
|||
if (modelConfig == null || modelConfig.isEmpty()) { |
|||
return; |
|||
} |
|||
|
|||
doSend(lwM2mClient, modelConfig); |
|||
} |
|||
|
|||
public void sendUpdates(LwM2mClient lwM2mClient, LwM2MModelConfig newModelConfig) { |
|||
String endpoint = lwM2mClient.getEndpoint(); |
|||
LwM2MModelConfig modelConfig = currentModelConfigs.get(endpoint); |
|||
if (modelConfig == null || modelConfig.isEmpty()) { |
|||
modelConfig = newModelConfig; |
|||
currentModelConfigs.put(endpoint, modelConfig); |
|||
} else { |
|||
modelConfig.merge(newModelConfig); |
|||
} |
|||
|
|||
if (lwM2mClient.isAsleep()) { |
|||
modelStore.put(modelConfig); |
|||
} else { |
|||
doSend(lwM2mClient, modelConfig); |
|||
} |
|||
} |
|||
|
|||
private void doSend(LwM2mClient lwM2mClient, LwM2MModelConfig modelConfig) { |
|||
log.trace("Send LwM2M Model updates: [{}]", modelConfig); |
|||
|
|||
String endpoint = lwM2mClient.getEndpoint(); |
|||
|
|||
Map<String, ObjectAttributes> attrToAdd = modelConfig.getAttributesToAdd(); |
|||
attrToAdd.forEach((id, attributes) -> { |
|||
TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(id) |
|||
.attributes(attributes) |
|||
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); |
|||
downlinkMsgHandler.sendWriteAttributesRequest(lwM2mClient, request, |
|||
createDownlinkProxyCallback(() -> { |
|||
attrToAdd.remove(id); |
|||
if (modelConfig.isEmpty()) { |
|||
modelStore.remove(endpoint); |
|||
} |
|||
}, new TbLwM2MWriteAttributesCallback(logService, lwM2mClient, id)) |
|||
); |
|||
}); |
|||
|
|||
Set<String> attrToRemove = modelConfig.getAttributesToRemove(); |
|||
attrToRemove.forEach((id) -> { |
|||
TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(id) |
|||
.attributes(new ObjectAttributes()) |
|||
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); |
|||
downlinkMsgHandler.sendWriteAttributesRequest(lwM2mClient, request, |
|||
createDownlinkProxyCallback(() -> { |
|||
attrToRemove.remove(id); |
|||
if (modelConfig.isEmpty()) { |
|||
modelStore.remove(endpoint); |
|||
} |
|||
}, new TbLwM2MWriteAttributesCallback(logService, lwM2mClient, id)) |
|||
); |
|||
}); |
|||
|
|||
Set<String> toRead = modelConfig.getToRead(); |
|||
toRead.forEach(id -> { |
|||
TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(id) |
|||
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); |
|||
downlinkMsgHandler.sendReadRequest(lwM2mClient, request, |
|||
createDownlinkProxyCallback(() -> { |
|||
toRead.remove(id); |
|||
if (modelConfig.isEmpty()) { |
|||
modelStore.remove(endpoint); |
|||
} |
|||
}, new TbLwM2MReadCallback(uplinkMsgHandler, logService, lwM2mClient, id)) |
|||
); |
|||
}); |
|||
|
|||
Set<String> toObserve = modelConfig.getToObserve(); |
|||
toObserve.forEach(id -> { |
|||
TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(id) |
|||
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); |
|||
downlinkMsgHandler.sendObserveRequest(lwM2mClient, request, |
|||
createDownlinkProxyCallback(() -> { |
|||
toObserve.remove(id); |
|||
if (modelConfig.isEmpty()) { |
|||
modelStore.remove(endpoint); |
|||
} |
|||
}, new TbLwM2MObserveCallback(uplinkMsgHandler, logService, lwM2mClient, id)) |
|||
); |
|||
}); |
|||
|
|||
Set<String> toCancelObserve = modelConfig.getToCancelObserve(); |
|||
toCancelObserve.forEach(id -> { |
|||
TbLwM2MCancelObserveRequest request = TbLwM2MCancelObserveRequest.builder().versionedId(id) |
|||
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); |
|||
downlinkMsgHandler.sendCancelObserveRequest(lwM2mClient, request, |
|||
createDownlinkProxyCallback(() -> { |
|||
toCancelObserve.remove(id); |
|||
if (modelConfig.isEmpty()) { |
|||
modelStore.remove(endpoint); |
|||
} |
|||
}, new TbLwM2MCancelObserveCallback(logService, lwM2mClient, id)) |
|||
); |
|||
}); |
|||
} |
|||
|
|||
private <R, T> DownlinkRequestCallback<R, T> createDownlinkProxyCallback(Runnable processRemove, DownlinkRequestCallback<R, T> callback) { |
|||
return new DownlinkRequestCallback<>() { |
|||
@Override |
|||
public void onSuccess(R request, T response) { |
|||
processRemove.run(); |
|||
callback.onSuccess(request, response); |
|||
} |
|||
|
|||
@Override |
|||
public void onValidationError(String params, String msg) { |
|||
processRemove.run(); |
|||
callback.onValidationError(params, msg); |
|||
} |
|||
|
|||
@Override |
|||
public void onError(String params, Exception e) { |
|||
try { |
|||
if (e instanceof TimeoutException) { |
|||
return; |
|||
} |
|||
processRemove.run(); |
|||
} finally { |
|||
callback.onError(params, e); |
|||
} |
|||
} |
|||
|
|||
}; |
|||
} |
|||
|
|||
@Override |
|||
public void persistUpdates(String endpoint) { |
|||
LwM2MModelConfig modelConfig = currentModelConfigs.get(endpoint); |
|||
if (modelConfig != null && !modelConfig.isEmpty()) { |
|||
modelStore.put(modelConfig); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void removeUpdates(String endpoint) { |
|||
currentModelConfigs.remove(endpoint); |
|||
} |
|||
|
|||
@PreDestroy |
|||
private void destroy() { |
|||
currentModelConfigs.values().forEach(model -> { |
|||
if (model != null && !model.isEmpty()) { |
|||
modelStore.put(model); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,38 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.store; |
|||
|
|||
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
|
|||
public class TbDummyLwM2MModelConfigStore implements TbLwM2MModelConfigStore { |
|||
@Override |
|||
public List<LwM2MModelConfig> getAll() { |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
@Override |
|||
public void put(LwM2MModelConfig modelConfig) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void remove(String endpoint) { |
|||
|
|||
} |
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.store; |
|||
|
|||
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface TbLwM2MModelConfigStore { |
|||
List<LwM2MModelConfig> getAll(); |
|||
|
|||
void put(LwM2MModelConfig modelConfig); |
|||
|
|||
void remove(String endpoint); |
|||
} |
|||
@ -0,0 +1,79 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.transport.lwm2m.server.store; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.data.redis.connection.RedisClusterConnection; |
|||
import org.springframework.data.redis.connection.RedisConnectionFactory; |
|||
import org.springframework.data.redis.core.Cursor; |
|||
import org.springframework.data.redis.core.ScanOptions; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
@AllArgsConstructor |
|||
public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore { |
|||
private static final String MODEL_EP = "MODEL#EP#"; |
|||
private final RedisConnectionFactory connectionFactory; |
|||
|
|||
@Override |
|||
public List<LwM2MModelConfig> getAll() { |
|||
try (var connection = connectionFactory.getConnection()) { |
|||
List<LwM2MModelConfig> configs = new ArrayList<>(); |
|||
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(MODEL_EP + "*").build(); |
|||
List<Cursor<byte[]>> scans = new ArrayList<>(); |
|||
if (connection instanceof RedisClusterConnection) { |
|||
((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { |
|||
scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); |
|||
}); |
|||
} else { |
|||
scans.add(connection.scan(scanOptions)); |
|||
} |
|||
|
|||
scans.forEach(scan -> { |
|||
scan.forEachRemaining(key -> { |
|||
byte[] element = connection.get(key); |
|||
configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); |
|||
}); |
|||
}); |
|||
return configs; |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void put(LwM2MModelConfig modelConfig) { |
|||
byte[] clientSerialized = JacksonUtil.writeValueAsBytes(modelConfig); |
|||
try (var connection = connectionFactory.getConnection()) { |
|||
connection.getSet(getKey(modelConfig.getEndpoint()), clientSerialized); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void remove(String endpoint) { |
|||
try (var connection = connectionFactory.getConnection()) { |
|||
connection.del(getKey(endpoint)); |
|||
} |
|||
} |
|||
|
|||
private byte[] getKey(String endpoint) { |
|||
return (MODEL_EP + endpoint).getBytes(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2021 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.common.util; |
|||
|
|||
import java.util.Collection; |
|||
import java.util.Set; |
|||
import java.util.stream.Collectors; |
|||
|
|||
public class CollectionsUtil { |
|||
public static boolean isEmpty(Collection<?> collection) { |
|||
return collection == null || collection.isEmpty(); |
|||
} |
|||
|
|||
public static boolean isNotEmpty(Collection<?> collection) { |
|||
return !isEmpty(collection); |
|||
} |
|||
|
|||
/** |
|||
* Returns new set with elements that are present in set B(new) but absent in set A(old). |
|||
*/ |
|||
public static <T> Set<T> diffSets(Set<T> a, Set<T> b) { |
|||
return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet()); |
|||
} |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue