From 0bc5be477c585af7953d44824bfcd74a3c4ee50c Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 17 Aug 2022 16:55:22 +0300 Subject: [PATCH] Coap Device Profile Update fix --- .../coap/client/DefaultCoapClientContext.java | 43 +++++++++++++++++++ .../common/transport/DeviceDeletedEvent.java | 33 ++++++++++++++ .../transport/DeviceProfileUpdatedEvent.java | 31 +++++++++++++ .../service/DefaultTransportService.java | 14 +++--- 4 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceDeletedEvent.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceProfileUpdatedEvent.java diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index 1b3c936130..f8336025d4 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -23,6 +23,7 @@ import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Lazy; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.thingsboard.server.coapserver.CoapServerContext; import org.thingsboard.server.common.data.DataConstants; @@ -45,6 +46,8 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.common.transport.DeviceDeletedEvent; +import org.thingsboard.server.common.transport.DeviceUpdatedEvent; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportService; @@ -52,6 +55,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; +import org.thingsboard.server.common.transport.DeviceProfileUpdatedEvent; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.transport.coap.CoapTransportContext; @@ -97,6 +101,45 @@ public class DefaultCoapClientContext implements CoapClientContext { this.partitionService = partitionService; } + @EventListener(DeviceProfileUpdatedEvent.class) + public void onApplicationEvent(DeviceProfileUpdatedEvent event) { + var deviceProfile = event.getDeviceProfile(); + clients.values().stream().filter(state -> state.getSession() == null).forEach(state -> { + state.lock(); + try { + if (deviceProfile.getId().equals(state.getProfileId())) { + initStateAdaptor(deviceProfile, state); + } + } catch (AdaptorException e) { + log.trace("[{}] Failed to update client state due to: ", state.getDeviceId(), e); + } finally { + state.unlock(); + } + }); + } + + @EventListener(DeviceUpdatedEvent.class) + public void onApplicationEvent(DeviceUpdatedEvent event) { + var device = event.getDevice(); + var state = clients.get(device.getId()); + if (state == null) { + return; + } + state.lock(); + try { + if (state.getSession() == null) { + clients.remove(device.getId()); + } + } finally { + state.unlock(); + } + } + + @EventListener(DeviceDeletedEvent.class) + public void onApplicationEvent(DeviceDeletedEvent event) { + clients.remove(event.getDeviceId()); + } + @Override public boolean registerAttributeObservation(TbCoapClientState clientState, String token, CoapExchange exchange) { return registerFeatureObservation(clientState, token, exchange, FeatureType.ATTRIBUTES); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceDeletedEvent.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceDeletedEvent.java new file mode 100644 index 0000000000..9a7cdbefdd --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceDeletedEvent.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport; + +import lombok.Getter; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.queue.discovery.event.TbApplicationEvent; + +public final class DeviceDeletedEvent extends TbApplicationEvent { + + private static final long serialVersionUID = -7453664970966733857L; + @Getter + private final DeviceId deviceId; + + public DeviceDeletedEvent(DeviceId deviceId) { + super(new Object()); + this.deviceId = deviceId; + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceProfileUpdatedEvent.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceProfileUpdatedEvent.java new file mode 100644 index 0000000000..ca95a636d3 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/DeviceProfileUpdatedEvent.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport; + +import lombok.Getter; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.queue.discovery.event.TbApplicationEvent; + +public final class DeviceProfileUpdatedEvent extends TbApplicationEvent { + + @Getter + private final DeviceProfile deviceProfile; + + public DeviceProfileUpdatedEvent(DeviceProfile deviceProfile) { + super(new Object()); + this.deviceProfile = deviceProfile; + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index a63e4a01a2..8b532ae92e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -44,7 +44,6 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; @@ -58,6 +57,8 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.stats.MessagesStats; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; +import org.thingsboard.server.common.transport.DeviceDeletedEvent; +import org.thingsboard.server.common.transport.DeviceProfileUpdatedEvent; import org.thingsboard.server.common.transport.DeviceUpdatedEvent; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportDeviceProfileCache; @@ -925,10 +926,7 @@ public class DefaultTransportService implements TransportService { } } else if (EntityType.DEVICE.equals(entityType)) { Optional deviceOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); - deviceOpt.ifPresent(device -> { - onDeviceUpdate(device); - eventPublisher.publishEvent(new DeviceUpdatedEvent(device)); - }); + deviceOpt.ifPresent(this::onDeviceUpdate); } } else if (toSessionMsg.hasEntityDeleteMsg()) { TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); @@ -994,6 +992,8 @@ public class DefaultTransportService implements TransportService { transportCallbackExecutor.submit(() -> md.getListener().onDeviceProfileUpdate(newSessionInfo, deviceProfile)); } }); + + eventPublisher.publishEvent(new DeviceProfileUpdatedEvent(deviceProfile)); } private void onDeviceUpdate(Device device) { @@ -1027,6 +1027,8 @@ public class DefaultTransportService implements TransportService { transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile))); } }); + + eventPublisher.publishEvent(new DeviceUpdatedEvent(device)); } private void onDeviceDeleted(DeviceId deviceId) { @@ -1038,6 +1040,8 @@ public class DefaultTransportService implements TransportService { }); } }); + + eventPublisher.publishEvent(new DeviceDeletedEvent(deviceId)); } protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) {