@ -63,68 +63,76 @@ public class ZkDiscoveryServiceTest {
@Mock
private PathChildrenCache cache ;
private ScheduledExecutorService zkExecutorService ;
@Mock
private CuratorFramework curatorFramework ;
private ZkDiscoveryService zkDiscoveryService ;
private static final long RECALCULATE_DELAY = 100L ;
final TransportProtos . ServiceInfo currentInfo = TransportProtos . ServiceInfo . newBuilder ( ) . setServiceId ( "tb-rule-engine-0" ) . build ( ) ;
final ChildData currentData = new ChildData ( "/thingsboard/nodes/0000000010" , null , currentInfo . toByteArray ( ) ) ;
final TransportProtos . ServiceInfo childInfo = TransportProtos . ServiceInfo . newBuilder ( ) . setServiceId ( "tb-rule-engine-1" ) . build ( ) ;
final ChildData childData = new ChildData ( "/thingsboard/nodes/0000000020" , null , childInfo . toByteArray ( ) ) ;
@Before
public void setup ( ) {
zkDiscoveryService = Mockito . spy ( new ZkDiscoveryService ( serviceInfoProvider , partitionService ) ) ;
zkExecutorService = Executors . newSingleThreadScheduledExecutor ( ThingsBoardThreadFactory . forName ( "zk-discovery" ) ) ;
ScheduledExecutorService zkExecutorService = Executors . newSingleThreadScheduledExecutor ( ThingsBoardThreadFactory . forName ( "zk-discovery" ) ) ;
when ( client . getState ( ) ) . thenReturn ( CuratorFrameworkState . STARTED ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "stopped" , false ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "client" , client ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "cache" , cache ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "nodePath" , "/thingsboard/nodes/0000000010" ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "zkExecutorService" , zkExecutorService ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "recalculateDelay" , 1000L ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "recalculateDelay" , RECALCULATE_DELAY ) ;
ReflectionTestUtils . setField ( zkDiscoveryService , "zkDir" , "/thingsboard" ) ;
}
@Test
public void restartNodeTest ( ) throws Exception {
var currentInfo = TransportProtos . ServiceInfo . newBuilder ( ) . setServiceId ( "currentId" ) . build ( ) ;
var currentData = new ChildData ( "/thingsboard/nodes/0000000010" , null , currentInfo . toByteArray ( ) ) ;
var childInfo = TransportProtos . ServiceInfo . newBuilder ( ) . setServiceId ( "childId" ) . build ( ) ;
var childData = new ChildData ( "/thingsboard/nodes/0000000020" , null , childInfo . toByteArray ( ) ) ;
when ( serviceInfoProvider . getServiceInfo ( ) ) . thenReturn ( currentInfo ) ;
List < ChildData > dataList = new ArrayList < > ( ) ;
dataList . add ( currentData ) ;
when ( cache . getCurrentData ( ) ) . thenReturn ( dataList ) ;
}
@Test
public void restartNodeInTimeTest ( ) throws Exception {
startNode ( childData ) ;
verify ( partitionService , times ( 1 ) ) . recalculatePartitions ( eq ( currentInfo ) , eq ( List . of ( childInfo ) ) ) ;
reset ( partitionService ) ;
//Restart in timeAssert.assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
stopNode ( childData ) ;
assertEquals ( 1 , zkDiscoveryService . delayedTasks . size ( ) ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( eq ( currentInfo ) , any ( ) ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( any ( ) , any ( ) ) ;
startNode ( childData ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( eq ( currentInfo ) , any ( ) ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( any ( ) , any ( ) ) ;
Thread . sleep ( 2000 ) ;
Thread . sleep ( RECALCULATE_DELAY * 2 ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( eq ( currentInfo ) , any ( ) ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( any ( ) , any ( ) ) ;
assertTrue ( zkDiscoveryService . delayedTasks . isEmpty ( ) ) ;
}
@Test
public void restartNodeNotInTimeTest ( ) throws Exception {
startNode ( childData ) ;
verify ( partitionService , times ( 1 ) ) . recalculatePartitions ( eq ( currentInfo ) , eq ( List . of ( childInfo ) ) ) ;
reset ( partitionService ) ;
//Restart not in time
stopNode ( childData ) ;
assertEquals ( 1 , zkDiscoveryService . delayedTasks . size ( ) ) ;
Thread . sleep ( 2000 ) ;
Thread . sleep ( RECALCULATE_DELAY * 2 ) ;
assertTrue ( zkDiscoveryService . delayedTasks . isEmpty ( ) ) ;
@ -135,11 +143,19 @@ public class ZkDiscoveryServiceTest {
verify ( partitionService , times ( 1 ) ) . recalculatePartitions ( eq ( currentInfo ) , eq ( List . of ( childInfo ) ) ) ;
reset ( partitionService ) ;
}
//Start another node during restart
var anotherInfo = TransportProtos . ServiceInfo . newBuilder ( ) . setServiceId ( "anotherId" ) . build ( ) ;
@Test
public void startAnotherNodeDuringRestartTest ( ) throws Exception {
var anotherInfo = TransportProtos . ServiceInfo . newBuilder ( ) . setServiceId ( "tb-transport" ) . build ( ) ;
var anotherData = new ChildData ( "/thingsboard/nodes/0000000030" , null , anotherInfo . toByteArray ( ) ) ;
startNode ( childData ) ;
verify ( partitionService , times ( 1 ) ) . recalculatePartitions ( eq ( currentInfo ) , eq ( List . of ( childInfo ) ) ) ;
reset ( partitionService ) ;
stopNode ( childData ) ;
assertEquals ( 1 , zkDiscoveryService . delayedTasks . size ( ) ) ;
@ -151,9 +167,9 @@ public class ZkDiscoveryServiceTest {
verify ( partitionService , times ( 1 ) ) . recalculatePartitions ( eq ( currentInfo ) , eq ( List . of ( anotherInfo ) ) ) ;
reset ( partitionService ) ;
Thread . sleep ( 2000 ) ;
Thread . sleep ( RECALCULATE_DELAY * 2 ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( eq ( currentInfo ) , any ( ) ) ;
verify ( partitionService , never ( ) ) . recalculatePartitions ( any ( ) , any ( ) ) ;
startNode ( childData ) ;