Browse Source

Added tenant id to the methods with locks and minor refactoring

pull/11501/head
YevhenBondarenko 2 years ago
parent
commit
bc0022aafd
  1. 8
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java
  2. 64
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java
  3. 2
      application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java
  4. 2
      application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java
  5. 2
      application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java
  6. 2
      application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java
  7. 2
      application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java
  8. 6
      application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java
  9. 6
      application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java
  10. 22
      application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java
  11. 3
      application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java
  12. 2
      application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java
  13. 2
      common/proto/src/main/proto/queue.proto

8
application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java

@ -118,7 +118,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
}
callback.onSuccess();
if (event.hasTsOrAttrSub()) {
sendSubEventCallback(serviceId, entityId, event.getSeqNumber());
sendSubEventCallback(tenantId, serviceId, entityId, event.getSeqNumber());
}
} else {
log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}"
@ -142,12 +142,12 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
}
}
private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) {
private void sendSubEventCallback(TenantId tenantId, String targetId, EntityId entityId, int seqNumber) {
var update = getEntityUpdatesInfo(entityId);
if (serviceId.equals(targetId)) {
localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY);
localSubscriptionService.onSubEventCallback(tenantId, entityId, seqNumber, update, TbCallback.EMPTY);
} else {
sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update));
sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(tenantId, entityId.getId(), seqNumber, update));
}
}

64
application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java

@ -213,30 +213,28 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
@Override
public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) {
UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB());
onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback);
TenantId tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB()));
onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback);
}
@Override
public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback);
public void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback);
}
public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo);
public void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
log.debug("[{}][{}][{}] Processing sub event callback: {}.", tenantId, entityId, seqNumber, entityUpdatesInfo);
entityUpdates.put(entityId, entityUpdatesInfo);
Set<TbSubscription<?>> pendingSubs = null;
Lock subsLock = null;
Lock subsLock = getSubsLock(tenantId);
subsLock.lock();
try {
TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId);
if (entitySubs != null) {
subsLock = getSubsLock(entitySubs.getTenantId());
subsLock.lock();
pendingSubs = entitySubs.clearPendingSubscriptions(seqNumber);
}
} finally {
if (subsLock != null) {
subsLock.unlock();
}
subsLock.unlock();
}
if (pendingSubs != null) {
pendingSubs.forEach(this::checkMissedUpdates);
@ -245,31 +243,29 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
}
@Override
public void cancelSubscription(String sessionId, int subscriptionId) {
log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
public void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId) {
log.debug("[{}][{}][{}] Going to remove subscription.", tenantId, sessionId, subscriptionId);
SubscriptionModificationResult result = null;
Lock subsLock = null;
Lock subsLock = getSubsLock(tenantId);
subsLock.lock();
try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
if (sessionSubscriptions != null) {
TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId);
if (subscription != null) {
subsLock = getSubsLock(subscription.getTenantId());
subsLock.lock();
if (sessionSubscriptions.isEmpty()) {
subscriptionsBySessionId.remove(sessionId);
}
result = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false);
} else {
log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
log.debug("[{}][{}][{}] Subscription not found!", tenantId, sessionId, subscriptionId);
}
} else {
log.debug("[{}] No session subscriptions found!", sessionId);
log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
}
} finally {
if (subsLock != null) {
subsLock.unlock();
}
subsLock.unlock();
}
if (result != null && result.hasEvent()) {
pushSubscriptionEvent(result);
@ -277,27 +273,22 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
}
@Override
public void cancelAllSessionSubscriptions(String sessionId) {
log.debug("[{}] Going to remove session subscriptions.", sessionId);
public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) {
log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId);
List<SubscriptionModificationResult> results = new ArrayList<>();
Lock subsLock = null;
Lock subsLock = getSubsLock(tenantId);
subsLock.lock();
try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId);
if (sessionSubscriptions != null) {
for (TbSubscription<?> subscription : sessionSubscriptions.values()) {
if (subsLock == null) {
subsLock = getSubsLock(subscription.getTenantId());
subsLock.lock();
}
results.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false));
results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false));
}
} else {
log.debug("[{}] No session subscriptions found!", sessionId);
log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
}
} finally {
if (subsLock != null) {
subsLock.unlock();
}
subsLock.unlock();
}
results.stream().filter(SubscriptionModificationResult::hasEvent).forEach(this::pushSubscriptionEvent);
}
@ -602,7 +593,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
}
private void cleanupStaleSessions() {
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale);
subscriptionsBySessionId.forEach((sessionId, subscriptions) ->
subscriptions.values()
.stream()
.findAny()
.ifPresent(subscription -> webSocketService.cleanupIfStale(subscription.getTenantId(), sessionId))
);
}
private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) {

2
application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java

@ -113,7 +113,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
public void clearEntitySubscriptions() {
if (subToEntityIdMap != null) {
for (Integer subId : subToEntityIdMap.keySet()) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId);
localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId);
}
subToEntityIdMap.clear();
}

2
application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractSubCtx.java

@ -303,7 +303,7 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
protected void clearDynamicValueSubscriptions() {
if (subToDynamicValueKeySet != null) {
for (Integer subId : subToDynamicValueKeySet) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId);
localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId);
}
subToDynamicValueKeySet.clear();
}

2
application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java

@ -341,7 +341,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
long startTs = System.currentTimeMillis() - query.getPageLink().getTimeWindow();
newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity));
}
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId));
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
}

2
application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmsSubscription.java

@ -36,7 +36,7 @@ public class TbAlarmsSubscription extends TbSubscription<AlarmSubscriptionUpdate
}
@Override
protected boolean canEqual(final Object other) {
public boolean canEqual(final Object other) {
return other instanceof TbAlarmsSubscription;
}

2
application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java

@ -225,7 +225,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
}
}
}
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId));
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
}

6
application/src/main/java/org/thingsboard/server/service/subscription/TbLocalSubscriptionService.java

@ -34,11 +34,11 @@ public interface TbLocalSubscriptionService {
void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback);
void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty);
void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty);
void cancelSubscription(String sessionId, int subscriptionId);
void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId);
void cancelAllSessionSubscriptions(String sessionId);
void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId);
void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback);

6
application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java

@ -82,10 +82,12 @@ public class TbSubscriptionUtils {
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build();
}
public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) {
public static ToCoreNotificationMsg toProto(TenantId tenantId, UUID id, int seqNumber, TbEntityUpdatesInfo update) {
TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder()
.setEntityIdMSB(id.getMostSignificantBits())
.setEntityIdLSB(id.getLeastSignificantBits())
.setEntityIdLSB(tenantId.getId().getLeastSignificantBits())
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(id.getLeastSignificantBits())
.setSeqNumber(seqNumber)
.setAttributesUpdateTs(update.attributesUpdateTs)
.setTimeSeriesUpdateTs(update.timeSeriesUpdateTs);

22
application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java

@ -193,17 +193,18 @@ public class DefaultWebSocketService implements WebSocketService {
@Override
public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) {
String sessionId = sessionRef.getSessionId();
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
log.debug(PROCESSING_MSG, sessionId, event);
switch (event.getEventType()) {
case ESTABLISHED:
wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
break;
case ERROR:
log.debug("[{}] Unknown websocket session error: ", sessionId,
log.debug("[{}][{}] Unknown websocket session error: ", tenantId, sessionId,
event.getError().orElse(new RuntimeException("No error specified")));
break;
case CLOSED:
cleanupSessionById(sessionId);
cleanupSessionById(tenantId, sessionId);
processSessionClose(sessionRef);
break;
}
@ -297,10 +298,10 @@ public class DefaultWebSocketService implements WebSocketService {
}
@Override
public void cleanupIfStale(String sessionId) {
public void cleanupIfStale(TenantId tenantId, String sessionId) {
if (!msgEndpoint.isOpen(sessionId)) {
log.info("[{}] Cleaning up stale session ", sessionId);
cleanupSessionById(sessionId);
cleanupSessionById(tenantId, sessionId);
}
}
@ -754,22 +755,23 @@ public class DefaultWebSocketService implements WebSocketService {
}
private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId());
cleanupSessionById(sessionId);
log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", tenantId, sessionId, cmd.getCmdId());
cleanupSessionById(tenantId, sessionId);
} else {
Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId());
if (subId == null) {
log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId());
log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", tenantId, sessionId, cmd.getCmdId());
subId = cmd.getCmdId();
}
oldSubService.cancelSubscription(sessionId, subId);
oldSubService.cancelSubscription(tenantId, sessionId, subId);
}
}
private void cleanupSessionById(String sessionId) {
private void cleanupSessionById(TenantId tenantId, String sessionId) {
wsSessionsMap.remove(sessionId);
oldSubService.cancelAllSessionSubscriptions(sessionId);
oldSubService.cancelAllSessionSubscriptions(tenantId, sessionId);
sessionCmdMap.remove(sessionId);
entityDataSubService.cancelAllSessionSubscriptions(sessionId);
}

3
application/src/main/java/org/thingsboard/server/service/ws/WebSocketService.java

@ -16,6 +16,7 @@
package org.thingsboard.server.service.ws;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
@ -37,6 +38,6 @@ public interface WebSocketService {
void close(String sessionId, CloseStatus status);
void cleanupIfStale(String sessionId);
void cleanupIfStale(TenantId tenantId, String sessionId);
}

2
application/src/main/java/org/thingsboard/server/service/ws/notification/DefaultNotificationCommandsHandler.java

@ -242,7 +242,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
@Override
public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), cmd.getCmdId());
localSubscriptionService.cancelSubscription(sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSessionId(), cmd.getCmdId());
}
private void sendUpdate(String sessionId, CmdUpdate update) {

2
common/proto/src/main/proto/queue.proto

@ -817,6 +817,8 @@ message TbEntitySubEventCallbackProto {
int32 seqNumber = 3;
int64 attributesUpdateTs = 4;
int64 timeSeriesUpdateTs = 5;
int64 tenantIdMSB = 6;
int64 tenantIdLSB = 7;
}
message TsValueProto {

Loading…
Cancel
Save