4343public class Cluster
4444{
4545 private static final String ADMIN_USER = "neo4j" ;
46- private static final int STARTUP_TIMEOUT_SECONDS = 60 ;
46+ private static final int STARTUP_TIMEOUT_SECONDS = 90 ;
47+ private static final int ONLINE_MEMBERS_CHECK_SLEEP_MS = 500 ;
4748
4849 private final Path path ;
4950 private final String password ;
5051 private final Set <ClusterMember > members ;
52+ private final Set <ClusterMember > offlineMembers ;
5153
5254 public Cluster ( Path path , String password )
5355 {
@@ -59,11 +61,12 @@ private Cluster( Path path, String password, Set<ClusterMember> members )
5961 this .path = path ;
6062 this .password = password ;
6163 this .members = members ;
64+ this .offlineMembers = new HashSet <>();
6265 }
6366
6467 Cluster withMembers ( Set <ClusterMember > newMembers ) throws ClusterUnavailableException
6568 {
66- waitForMembers ( newMembers , password );
69+ waitForMembersToBeOnline ( newMembers , password );
6770 return new Cluster ( path , password , newMembers );
6871 }
6972
@@ -126,6 +129,35 @@ public Set<ClusterMember> readReplicas()
126129 return membersWithRole ( ClusterMemberRole .READ_REPLICA );
127130 }
128131
132+ public void start ( ClusterMember member )
133+ {
134+ startNoWait ( member );
135+ waitForMembersToBeOnline ();
136+ }
137+
138+ public void startOfflineMembers ()
139+ {
140+ for ( ClusterMember member : offlineMembers )
141+ {
142+ startNoWait ( member );
143+ }
144+ waitForMembersToBeOnline ();
145+ }
146+
147+ public void stop ( ClusterMember member )
148+ {
149+ removeOfflineMember ( member );
150+ SharedCluster .stop ( member );
151+ waitForMembersToBeOnline ();
152+ }
153+
154+ public void kill ( ClusterMember member )
155+ {
156+ removeOfflineMember ( member );
157+ SharedCluster .kill ( member );
158+ waitForMembersToBeOnline ();
159+ }
160+
129161 @ Override
130162 public String toString ()
131163 {
@@ -135,6 +167,30 @@ public String toString()
135167 "}" ;
136168 }
137169
170+ private void addOfflineMember ( ClusterMember member )
171+ {
172+ if ( !offlineMembers .remove ( member ) )
173+ {
174+ throw new IllegalArgumentException ( "Cluster member is not offline: " + member );
175+ }
176+ members .add ( member );
177+ }
178+
179+ private void removeOfflineMember ( ClusterMember member )
180+ {
181+ if ( !members .remove ( member ) )
182+ {
183+ throw new IllegalArgumentException ( "Unknown cluster member " + member );
184+ }
185+ offlineMembers .add ( member );
186+ }
187+
188+ private void startNoWait ( ClusterMember member )
189+ {
190+ addOfflineMember ( member );
191+ SharedCluster .start ( member );
192+ }
193+
138194 private Set <ClusterMember > membersWithRole ( ClusterMemberRole role )
139195 {
140196 Set <ClusterMember > membersWithRole = new HashSet <>();
@@ -166,41 +222,57 @@ private Set<ClusterMember> membersWithRole( ClusterMemberRole role )
166222 return membersWithRole ;
167223 }
168224
169- private static Set <ClusterMember > waitForMembers ( Set <ClusterMember > members , String password )
225+ private void waitForMembersToBeOnline ()
226+ {
227+ try
228+ {
229+ waitForMembersToBeOnline ( members , password );
230+ }
231+ catch ( ClusterUnavailableException e )
232+ {
233+ throw new RuntimeException ( e );
234+ }
235+ }
236+
237+ private static void waitForMembersToBeOnline ( Set <ClusterMember > members , String password )
170238 throws ClusterUnavailableException
171239 {
172240 if ( members .isEmpty () )
173241 {
174242 throw new IllegalArgumentException ( "No members to wait for" );
175243 }
176244
177- Set <ClusterMember > offlineMembers = new HashSet <>( members );
245+ Set <URI > expectedOnlineUris = extractBoltUris ( members );
246+ Set <URI > actualOnlineUris = Collections .emptySet ();
247+
178248 long deadline = System .currentTimeMillis () + TimeUnit .SECONDS .toMillis ( STARTUP_TIMEOUT_SECONDS );
249+ Throwable error = null ;
179250
180- try ( Driver driver = createDriver ( members , password ) )
251+ while ( ! expectedOnlineUris . equals ( actualOnlineUris ) )
181252 {
182- while ( !offlineMembers .isEmpty () )
253+ sleep ( ONLINE_MEMBERS_CHECK_SLEEP_MS );
254+ assertDeadlineNotReached ( deadline , expectedOnlineUris , actualOnlineUris , error );
255+
256+ try ( Driver driver = createDriver ( members , password );
257+ Session session = driver .session ( AccessMode .READ ) )
183258 {
184- assertDeadlineNotReached ( deadline );
259+ List <Record > records = findClusterOverview ( session );
260+ actualOnlineUris = extractBoltUris ( records );
261+ }
262+ catch ( Throwable t )
263+ {
264+ t .printStackTrace ();
185265
186- try ( Session session = driver . session ( AccessMode . READ ) )
266+ if ( error == null )
187267 {
188- List <Record > records = findClusterOverview ( session );
189- for ( Record record : records )
190- {
191- URI boltUri = extractBoltUri ( record );
192-
193- ClusterMember member = findByBoltUri ( boltUri , offlineMembers );
194- if ( member != null )
195- {
196- offlineMembers .remove ( member );
197- }
198- }
268+ error = t ;
269+ }
270+ else
271+ {
272+ error .addSuppressed ( t );
199273 }
200274 }
201275 }
202-
203- return members ;
204276 }
205277
206278 private static Driver createDriver ( Set <ClusterMember > members , String password )
@@ -243,15 +315,48 @@ private static boolean isCoreMember( Session session )
243315 return role != ClusterMemberRole .READ_REPLICA ;
244316 }
245317
246- private static void assertDeadlineNotReached ( long deadline ) throws ClusterUnavailableException
318+ private static void assertDeadlineNotReached ( long deadline , Set <URI > expectedUris , Set <URI > actualUris ,
319+ Throwable error ) throws ClusterUnavailableException
247320 {
248321 if ( System .currentTimeMillis () > deadline )
249322 {
250- throw new ClusterUnavailableException (
251- "Cluster did not become available in " + STARTUP_TIMEOUT_SECONDS + " seconds" );
323+ String baseMessage = "Cluster did not become available in " + STARTUP_TIMEOUT_SECONDS + " seconds.\n " ;
324+ String errorMessage = error == null ? "" : "There were errors checking cluster members.\n " ;
325+ String expectedUrisMessage = "Expected online URIs: " + expectedUris + "\n " ;
326+ String actualUrisMessage = "Actual last seen online URIs: " + actualUris + "\n " ;
327+ String message = baseMessage + errorMessage + expectedUrisMessage + actualUrisMessage ;
328+
329+ ClusterUnavailableException clusterUnavailable = new ClusterUnavailableException ( message );
330+
331+ if ( error != null )
332+ {
333+ clusterUnavailable .addSuppressed ( error );
334+ }
335+
336+ throw clusterUnavailable ;
252337 }
253338 }
254339
340+ private static Set <URI > extractBoltUris ( Set <ClusterMember > members )
341+ {
342+ Set <URI > uris = new HashSet <>();
343+ for ( ClusterMember member : members )
344+ {
345+ uris .add ( member .getBoltUri () );
346+ }
347+ return uris ;
348+ }
349+
350+ private static Set <URI > extractBoltUris ( List <Record > records )
351+ {
352+ Set <URI > uris = new HashSet <>();
353+ for ( Record record : records )
354+ {
355+ uris .add ( extractBoltUri ( record ) );
356+ }
357+ return uris ;
358+ }
359+
255360 private static URI extractBoltUri ( Record record )
256361 {
257362 List <Object > addresses = record .get ( "addresses" ).asList ();
@@ -307,4 +412,17 @@ private static ClusterMember randomOf( Set<ClusterMember> members )
307412 }
308413 throw new AssertionError ();
309414 }
415+
416+ private static void sleep ( int millis )
417+ {
418+ try
419+ {
420+ Thread .sleep ( millis );
421+ }
422+ catch ( InterruptedException e )
423+ {
424+ Thread .currentThread ().interrupt ();
425+ throw new RuntimeException ( e );
426+ }
427+ }
310428}
0 commit comments