@ -168,17 +168,17 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
TenantId tenantId = subscription . getTenantId ( ) ;
EntityId entityId = subscription . getEntityId ( ) ;
log . debug ( "[{}][{}] Register subscription: {}" , tenantId , entityId , subscription ) ;
Modify SubscriptionResult modifySubsc riptionR esult;
SubscriptionModifica tionResult result ;
subsLock . lock ( ) ;
try {
Map < Integer , TbSubscription < ? > > sessionSubscriptions = subscriptionsBySessionId . computeIfAbsent ( subscription . getSessionId ( ) , k - > new ConcurrentHashMap < > ( ) ) ;
sessionSubscriptions . put ( subscription . getSubscriptionId ( ) , subscription ) ;
modifySubsc riptionR esult = modifySubscription ( tenantId , entityId , subscription , true ) ;
result = modifySubscription ( tenantId , entityId , subscription , true ) ;
} finally {
subsLock . unlock ( ) ;
}
if ( modifySubsc riptionR esult. hasEvent ( ) ) {
pushSubscriptionEvent ( modifySubsc riptionR esult) ;
if ( result . hasEvent ( ) ) {
pushSubscriptionEvent ( result ) ;
}
}
@ -215,7 +215,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
@Override
public void cancelSubscription ( String sessionId , int subscriptionId ) {
log . debug ( "[{}][{}] Going to remove subscription." , sessionId , subscriptionId ) ;
Modify SubscriptionResult modifySubsc riptionR esult = null ;
SubscriptionModifica tionResult result = null ;
subsLock . lock ( ) ;
try {
Map < Integer , TbSubscription < ? > > sessionSubscriptions = subscriptionsBySessionId . get ( sessionId ) ;
@ -225,7 +225,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
if ( sessionSubscriptions . isEmpty ( ) ) {
subscriptionsBySessionId . remove ( sessionId ) ;
}
modifySubsc riptionR esult = modifySubscription ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , subscription , false ) ;
result = modifySubscription ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , subscription , false ) ;
} else {
log . debug ( "[{}][{}] Subscription not found!" , sessionId , subscriptionId ) ;
}
@ -235,21 +235,21 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} finally {
subsLock . unlock ( ) ;
}
if ( modifySubsc riptionR esult ! = null & & modifySubsc riptionR esult. hasEvent ( ) ) {
pushSubscriptionEvent ( modifySubsc riptionR esult) ;
if ( result ! = null & & result . hasEvent ( ) ) {
pushSubscriptionEvent ( result ) ;
}
}
@Override
public void cancelAllSessionSubscriptions ( String sessionId ) {
log . debug ( "[{}] Going to remove session subscriptions." , sessionId ) ;
List < Modify SubscriptionResult> result = new ArrayList < > ( ) ;
List < SubscriptionModifica tionResult > results = new ArrayList < > ( ) ;
subsLock . lock ( ) ;
try {
Map < Integer , TbSubscription < ? > > sessionSubscriptions = subscriptionsBySessionId . remove ( sessionId ) ;
if ( sessionSubscriptions ! = null ) {
for ( TbSubscription < ? > subscription : sessionSubscriptions . values ( ) ) {
result . add ( modifySubscription ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , subscription , false ) ) ;
results . add ( modifySubscription ( subscription . getTenantId ( ) , subscription . getEntityId ( ) , subscription , false ) ) ;
}
} else {
log . debug ( "[{}] No session subscriptions found!" , sessionId ) ;
@ -257,7 +257,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} finally {
subsLock . unlock ( ) ;
}
result . stream ( ) . filter ( Modify SubscriptionResult: : hasEvent ) . forEach ( this : : pushSubscriptionEvent ) ;
results . stream ( ) . filter ( SubscriptionModifica tionResult : : hasEvent ) . forEach ( this : : pushSubscriptionEvent ) ;
}
@Override
@ -420,7 +420,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
callback . onSuccess ( ) ;
}
private Modify SubscriptionResult modifySubscription ( TenantId tenantId , EntityId entityId , TbSubscription < ? > subscription , boolean add ) {
private SubscriptionModifica tionResult modifySubscription ( TenantId tenantId , EntityId entityId , TbSubscription < ? > subscription , boolean add ) {
TbSubscription < ? > missedUpdatesCandidate = null ;
TbEntitySubEvent event = null ;
try {
@ -435,20 +435,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} catch ( Exception e ) {
log . warn ( "[{}][{}] Failed to {} subscription {} due to " , tenantId , entityId , add ? "add" : "remove" , subscription , e ) ;
}
return new Modify SubscriptionResult( tenantId , entityId , subscription , missedUpdatesCandidate , event ) ;
return new SubscriptionModifica tionResult ( tenantId , entityId , subscription , missedUpdatesCandidate , event ) ;
}
private void pushSubscriptionEvent ( ModifySubscriptionResult modifySub Result) {
private void pushSubscriptionEvent ( SubscriptionModificationResult modification Result) {
try {
TbEntitySubEvent event = modifySub Result . getEvent ( ) ;
log . trace ( "[{}][{}][{}] Event: {}" , modifySub Result . getTenantId ( ) , modifySub Result . getEntityId ( ) , modifySub Result . getSubscription ( ) . getSubscriptionId ( ) , event ) ;
pushSubEventToManagerService ( modifySub Result . getTenantId ( ) , modifySub Result . getEntityId ( ) , event ) ;
TbSubscription < ? > missedUpdatesCandidate = modifySub Result . getMissedUpdatesCandidate ( ) ;
TbEntitySubEvent event = modification Result . getEvent ( ) ;
log . trace ( "[{}][{}][{}] Event: {}" , modification Result . getTenantId ( ) , modification Result . getEntityId ( ) , modification Result . getSubscription ( ) . getSubscriptionId ( ) , event ) ;
pushSubEventToManagerService ( modification Result . getTenantId ( ) , modification Result . getEntityId ( ) , event ) ;
TbSubscription < ? > missedUpdatesCandidate = modification Result . getMissedUpdatesCandidate ( ) ;
if ( missedUpdatesCandidate ! = null ) {
checkMissedUpdates ( missedUpdatesCandidate ) ;
}
} catch ( Exception e ) {
log . warn ( "[{}][{}] Failed to push subscription event {} due to " , modifySub Result . getTenantId ( ) , modifySub Result . getEntityId ( ) , modifySub Result . getEvent ( ) , e ) ;
log . warn ( "[{}][{}] Failed to push subscription event {} due to " , modification Result . getTenantId ( ) , modification Result . getEntityId ( ) , modification Result . getEvent ( ) , e ) ;
}
}