@@ -22,7 +22,7 @@ public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int
2222 public abstract T execute (Jedis connection );
2323
2424 public T run (String key ) {
25- return runWithRetries (JedisClusterCRC16 .getSlot (key ), this . maxAttempts , false , null );
25+ return runWithRetries (JedisClusterCRC16 .getSlot (key ));
2626 }
2727
2828 public T run (int keyCount , String ... keys ) {
@@ -42,11 +42,11 @@ public T run(int keyCount, String... keys) {
4242 }
4343 }
4444
45- return runWithRetries (slot , this . maxAttempts , false , null );
45+ return runWithRetries (slot );
4646 }
4747
4848 public T runBinary (byte [] key ) {
49- return runWithRetries (JedisClusterCRC16 .getSlot (key ), this . maxAttempts , false , null );
49+ return runWithRetries (JedisClusterCRC16 .getSlot (key ));
5050 }
5151
5252 public T runBinary (int keyCount , byte []... keys ) {
@@ -66,7 +66,7 @@ public T runBinary(int keyCount, byte[]... keys) {
6666 }
6767 }
6868
69- return runWithRetries (slot , this . maxAttempts , false , null );
69+ return runWithRetries (slot );
7070 }
7171
7272 public T runWithAnyNode () {
@@ -79,62 +79,87 @@ public T runWithAnyNode() {
7979 }
8080 }
8181
82- private T runWithRetries (final int slot , int attempts , boolean tryRandomNode , JedisRedirectionException redirect ) {
83- if (attempts <= 0 ) {
84- throw new JedisClusterMaxAttemptsException ("No more cluster attempts left." );
85- }
82+ private interface ConnectionGetter {
83+ Jedis getConnection ();
84+ }
8685
87- Jedis connection = null ;
88- try {
86+ private T runWithRetries (final int slot ) {
87+ ConnectionGetter connectionGetter = new ConnectionGetter () {
88+ @ Override
89+ public Jedis getConnection () {
90+ return connectionHandler .getConnectionFromSlot (slot );
91+ }
92+ };
8993
90- if (redirect != null ) {
91- connection = this .connectionHandler .getConnectionFromNode (redirect .getTargetNode ());
92- if (redirect instanceof JedisAskDataException ) {
93- // TODO: Pipeline asking with the original command to make it faster....
94- connection .asking ();
95- }
96- } else {
97- if (tryRandomNode ) {
98- connection = connectionHandler .getConnection ();
94+ // If we got one redirection, stick with that and don't try anything else
95+ ConnectionGetter redirected = null ;
96+
97+ for (int currentAttempt = 0 ; currentAttempt < this .maxAttempts ; currentAttempt ++) {
98+ Jedis connection = null ;
99+ try {
100+ if (redirected != null ) {
101+ connection = redirected .getConnection ();
99102 } else {
100- connection = connectionHandler .getConnectionFromSlot (slot );
103+ connection = connectionGetter .getConnection ();
104+ }
105+ return execute (connection );
106+ } catch (JedisNoReachableClusterNodeException e ) {
107+ throw e ;
108+ } catch (JedisConnectionException e ) {
109+ connectionGetter = handleConnectionProblem (slot , currentAttempt );
110+ } catch (JedisRedirectionException e ) {
111+ redirected = handleRedirection (connection , e );
112+ } finally {
113+ if (connection != null ) {
114+ releaseConnection (connection );
101115 }
102116 }
117+ }
103118
104- return execute (connection );
119+ throw new JedisClusterMaxAttemptsException ("No more cluster attempts left." );
120+ }
105121
106- } catch (JedisNoReachableClusterNodeException jnrcne ) {
107- throw jnrcne ;
108- } catch (JedisConnectionException jce ) {
109- // release current connection before recursion
110- releaseConnection (connection );
111- connection = null ;
112-
113- if (attempts <= 1 ) {
114- //We need this because if node is not reachable anymore - we need to finally initiate slots
115- //renewing, or we can stuck with cluster state without one node in opposite case.
116- //But now if maxAttempts = [1 or 2] we will do it too often.
117- //TODO make tracking of successful/unsuccessful operations for node - do renewing only
118- //if there were no successful responses from this node last few seconds
119- this .connectionHandler .renewSlotCache ();
120- }
122+ private ConnectionGetter handleConnectionProblem (final int slot , int currentAttempt ) {
123+ int attemptsLeft = (maxAttempts - currentAttempt ) - 1 ;
124+ if (attemptsLeft <= 1 ) {
125+ //We need this because if node is not reachable anymore - we need to finally initiate slots
126+ //renewing, or we can stuck with cluster state without one node in opposite case.
127+ //But now if maxAttempts = [1 or 2] we will do it too often.
128+ //TODO make tracking of successful/unsuccessful operations for node - do renewing only
129+ //if there were no successful responses from this node last few seconds
130+ this .connectionHandler .renewSlotCache ();
131+ }
121132
122- return runWithRetries (slot , attempts - 1 , tryRandomNode , redirect );
123- } catch (JedisRedirectionException jre ) {
124- // if MOVED redirection occurred,
125- if (jre instanceof JedisMovedDataException ) {
126- // it rebuilds cluster's slot cache recommended by Redis cluster specification
127- this .connectionHandler .renewSlotCache (connection );
133+ return new ConnectionGetter () {
134+ @ Override
135+ public Jedis getConnection () {
136+ return connectionHandler .getConnectionFromSlot (slot );
128137 }
138+ };
139+ }
129140
130- // release current connection before recursion
131- releaseConnection (connection );
132- connection = null ;
133-
134- return runWithRetries (slot , attempts - 1 , false , jre );
135- } finally {
136- releaseConnection (connection );
141+ private ConnectionGetter handleRedirection (Jedis connection , final JedisRedirectionException jre ) {
142+ // if MOVED redirection occurred,
143+ if (jre instanceof JedisMovedDataException ) {
144+ // it rebuilds cluster's slot cache recommended by Redis cluster specification
145+ this .connectionHandler .renewSlotCache (connection );
137146 }
147+
148+ // release current connection before iteration
149+ releaseConnection (connection );
150+
151+ return new ConnectionGetter () {
152+ @ Override
153+ public Jedis getConnection () {
154+ Jedis connection = connectionHandler .getConnectionFromNode (jre .getTargetNode ());
155+ if (jre instanceof JedisAskDataException ) {
156+ // TODO: Pipeline asking with the original command to make it faster....
157+ connection .asking ();
158+ }
159+
160+ return connection ;
161+ }
162+ };
138163 }
139164
140165 private void releaseConnection (Jedis connection ) {
0 commit comments