11package redis .clients .jedis ;
22
33import java .util .Arrays ;
4- import java .util .HashSet ;
4+ import java .util .Collection ;
5+ import java .util .LinkedList ;
56import java .util .List ;
67import java .util .Set ;
78import java .util .concurrent .atomic .AtomicBoolean ;
9+ import java .util .stream .Collectors ;
810
911import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
1012import org .slf4j .Logger ;
1517
1618public class JedisSentinelPool extends JedisPoolAbstract {
1719
18- /**
19- * @deprecated This will be private in future.
20- */
21- @ Deprecated
22- protected static Logger log = LoggerFactory .getLogger (JedisSentinelPool .class );
20+ private static final Logger LOG = LoggerFactory .getLogger (JedisSentinelPool .class );
2321
24- protected final GenericObjectPoolConfig poolConfig ;
22+ private final GenericObjectPoolConfig poolConfig ;
2523
26- protected final int connectionTimeout ;
27- protected final int soTimeout ;
28- protected final int infiniteSoTimeout ;
24+ private final JedisClientConfig masterClientConfig ;
2925
30- protected final String user ;
31- protected final String password ;
32- protected final int database ;
33- protected final String clientName ;
26+ private final JedisClientConfig sentinelClientConfig ;
3427
35- protected int sentinelConnectionTimeout ;
36- protected int sentinelSoTimeout ;
37- protected String sentinelUser ;
38- protected String sentinelPassword ;
39- protected String sentinelClientName ;
40-
41- protected final Set <MasterListener > masterListeners = new HashSet <>();
28+ private final Collection <MasterListener > masterListeners = new LinkedList <>();
4229
4330 private volatile JedisFactory factory ;
4431 private volatile HostAndPort currentHostMaster ;
@@ -160,25 +147,32 @@ public JedisSentinelPool(String masterName, Set<String> sentinels,
160147 final String user , final String password , final int database , final String clientName ,
161148 final int sentinelConnectionTimeout , final int sentinelSoTimeout , final String sentinelUser ,
162149 final String sentinelPassword , final String sentinelClientName ) {
150+ this (masterName , parseHostAndPorts (sentinels ), poolConfig ,
151+ DefaultJedisClientConfig .builder ().withConnectionTimeout (connectionTimeout )
152+ .withSoTimeout (soTimeout ).withInfiniteSoTimeout (infiniteSoTimeout ).withUser (user )
153+ .withPassword (password ).withDatabse (database ).withClientName (clientName ).build (),
154+ DefaultJedisClientConfig .builder ().withConnectionTimeout (sentinelConnectionTimeout )
155+ .withSoTimeout (sentinelSoTimeout ).withUser (sentinelUser ).withPassword (sentinelPassword )
156+ .withClientName (sentinelClientName ).build ()
157+ );
158+ }
159+
160+ public JedisSentinelPool (String masterName , Set <HostAndPort > sentinels ,
161+ final GenericObjectPoolConfig poolConfig , final JedisClientConfig masteClientConfig ,
162+ final JedisClientConfig sentinelClientConfig ) {
163163
164164 this .poolConfig = poolConfig ;
165- this .connectionTimeout = connectionTimeout ;
166- this .soTimeout = soTimeout ;
167- this .infiniteSoTimeout = infiniteSoTimeout ;
168- this .user = user ;
169- this .password = password ;
170- this .database = database ;
171- this .clientName = clientName ;
172- this .sentinelConnectionTimeout = sentinelConnectionTimeout ;
173- this .sentinelSoTimeout = sentinelSoTimeout ;
174- this .sentinelUser = sentinelUser ;
175- this .sentinelPassword = sentinelPassword ;
176- this .sentinelClientName = sentinelClientName ;
165+ this .masterClientConfig = masteClientConfig ;
166+ this .sentinelClientConfig = sentinelClientConfig ;
177167
178168 HostAndPort master = initSentinels (sentinels , masterName );
179169 initPool (master );
180170 }
181171
172+ private static Set <HostAndPort > parseHostAndPorts (Set <String > strings ) {
173+ return strings .parallelStream ().map (str -> HostAndPort .parseString (str )).collect (Collectors .toSet ());
174+ }
175+
182176 @ Override
183177 public void destroy () {
184178 for (MasterListener m : masterListeners ) {
@@ -192,13 +186,12 @@ public HostAndPort getCurrentHostMaster() {
192186 return currentHostMaster ;
193187 }
194188
195- private void initPool (HostAndPort master ) {
196- synchronized (initPoolLock ){
189+ private void initPool (final HostAndPort master ) {
190+ synchronized (initPoolLock ) {
197191 if (!master .equals (currentHostMaster )) {
198192 currentHostMaster = master ;
199193 if (factory == null ) {
200- factory = new JedisFactory (master .getHost (), master .getPort (), connectionTimeout ,
201- soTimeout , infiniteSoTimeout , user , password , database , clientName );
194+ factory = new JedisFactory (master , masterClientConfig );
202195 initPool (poolConfig , factory );
203196 } else {
204197 factory .setHostAndPort (currentHostMaster );
@@ -207,59 +200,48 @@ private void initPool(HostAndPort master) {
207200 clearInternalPool ();
208201 }
209202
210- log .info ("Created JedisPool to master at {}" , master );
203+ LOG .info ("Created JedisSentinelPool to master at {}" , master );
211204 }
212205 }
213206 }
214207
215- private HostAndPort initSentinels (Set <String > sentinels , final String masterName ) {
208+ private HostAndPort initSentinels (Set <HostAndPort > sentinels , final String masterName ) {
216209
217210 HostAndPort master = null ;
218211 boolean sentinelAvailable = false ;
219212
220- log .info ("Trying to find master from available Sentinels..." );
213+ LOG .info ("Trying to find master from available Sentinels..." );
221214
222- for (String sentinel : sentinels ) {
223- final HostAndPort hap = HostAndPort .parseString (sentinel );
215+ for (HostAndPort sentinel : sentinels ) {
224216
225- log .debug ("Connecting to Sentinel {}" , hap );
217+ LOG .debug ("Connecting to Sentinel {}" , sentinel );
226218
227-
228- try (Jedis jedis = new Jedis (hap .getHost (), hap .getPort (), sentinelConnectionTimeout , sentinelSoTimeout )){
229- if (sentinelUser != null ) {
230- jedis .auth (sentinelUser , sentinelPassword );
231- } else if (sentinelPassword != null ) {
232- jedis .auth (sentinelPassword );
233- }
234- if (sentinelClientName != null ) {
235- jedis .clientSetname (sentinelClientName );
236- }
219+ try (Jedis jedis = new Jedis (sentinel , sentinelClientConfig )) {
237220
238221 List <String > masterAddr = jedis .sentinelGetMasterAddrByName (masterName );
239222
240223 // connected to sentinel...
241224 sentinelAvailable = true ;
242225
243226 if (masterAddr == null || masterAddr .size () != 2 ) {
244- log .warn ("Can not get master addr, master name: {}. Sentinel: {}" , masterName , hap );
227+ LOG .warn ("Can not get master addr, master name: {}. Sentinel: {}" , masterName , sentinel );
245228 continue ;
246229 }
247230
248231 master = toHostAndPort (masterAddr );
249- log .debug ("Found Redis master at {}" , master );
232+ LOG .debug ("Found Redis master at {}" , master );
250233 break ;
251234 } catch (JedisException e ) {
252235 // resolves #1036, it should handle JedisException there's another chance
253236 // of raising JedisDataException
254- log .warn (
255- "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one." , hap , e );
237+ LOG .warn (
238+ "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one." , sentinel , e );
256239 }
257240 }
258241
259242 if (master == null ) {
260243 if (sentinelAvailable ) {
261- // can connect to sentinel, but master name seems to not
262- // monitored
244+ // can connect to sentinel, but master name seems to not monitored
263245 throw new JedisException ("Can connect to sentinel, but " + masterName
264246 + " seems to be not monitored..." );
265247 } else {
@@ -268,11 +250,11 @@ private HostAndPort initSentinels(Set<String> sentinels, final String masterName
268250 }
269251 }
270252
271- log .info ("Redis master running at {}, starting Sentinel listeners..." , master );
253+ LOG .info ("Redis master running at {}, starting Sentinel listeners..." , master );
254+
255+ for (HostAndPort sentinel : sentinels ) {
272256
273- for (String sentinel : sentinels ) {
274- final HostAndPort hap = HostAndPort .parseString (sentinel );
275- MasterListener masterListener = new MasterListener (masterName , hap .getHost (), hap .getPort ());
257+ MasterListener masterListener = new MasterListener (masterName , sentinel .getHost (), sentinel .getPort ());
276258 // whether MasterListener threads are alive or not, process can be stopped
277259 masterListener .setDaemon (true );
278260 masterListeners .add (masterListener );
@@ -317,7 +299,7 @@ public void returnResource(final Jedis resource) {
317299 returnResourceObject (resource );
318300 } catch (Exception e ) {
319301 returnBrokenResource (resource );
320- log .debug ("Resource is returned to the pool as broken" , e );
302+ LOG .debug ("Resource is returned to the pool as broken" , e );
321303 }
322304 }
323305 }
@@ -360,28 +342,21 @@ public void run() {
360342 break ;
361343 }
362344
363- j = new Jedis (host , port , sentinelConnectionTimeout , sentinelSoTimeout );
364- if (sentinelUser != null ) {
365- j .auth (sentinelUser , sentinelPassword );
366- } else if (sentinelPassword != null ) {
367- j .auth (sentinelPassword );
368- }
369- if (sentinelClientName != null ) {
370- j .clientSetname (sentinelClientName );
371- }
345+ final HostAndPort hostPort = new HostAndPort (host , port );
346+ j = new Jedis (hostPort , sentinelClientConfig );
372347
373348 // code for active refresh
374349 List <String > masterAddr = j .sentinelGetMasterAddrByName (masterName );
375350 if (masterAddr == null || masterAddr .size () != 2 ) {
376- log .warn ("Can not get master addr, master name: {}. Sentinel: {}:{} ." , masterName , host , port );
351+ LOG .warn ("Can not get master addr, master name: {}. Sentinel: {}." , masterName , hostPort );
377352 } else {
378353 initPool (toHostAndPort (masterAddr ));
379354 }
380355
381356 j .subscribe (new JedisPubSub () {
382357 @ Override
383358 public void onMessage (String channel , String message ) {
384- log .debug ("Sentinel {}:{} published: {}." , host , port , message );
359+ LOG .debug ("Sentinel {} published: {}." , hostPort , message );
385360
386361 String [] switchMasterMsg = message .split (" " );
387362
@@ -390,31 +365,30 @@ public void onMessage(String channel, String message) {
390365 if (masterName .equals (switchMasterMsg [0 ])) {
391366 initPool (toHostAndPort (Arrays .asList (switchMasterMsg [3 ], switchMasterMsg [4 ])));
392367 } else {
393- log .debug (
368+ LOG .debug (
394369 "Ignoring message on +switch-master for master name {}, our master name is {}" ,
395370 switchMasterMsg [0 ], masterName );
396371 }
397372
398373 } else {
399- log .error (
400- "Invalid message received on Sentinel {}:{} on channel +switch-master: {}" , host ,
401- port , message );
374+ LOG .error ("Invalid message received on Sentinel {} on channel +switch-master: {}" ,
375+ hostPort , message );
402376 }
403377 }
404378 }, "+switch-master" );
405379
406380 } catch (JedisException e ) {
407381
408382 if (running .get ()) {
409- log .error ("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying." , host ,
383+ LOG .error ("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying." , host ,
410384 port , e );
411385 try {
412386 Thread .sleep (subscribeRetryWaitTimeMillis );
413387 } catch (InterruptedException e1 ) {
414- log .error ("Sleep interrupted: " , e1 );
388+ LOG .error ("Sleep interrupted: " , e1 );
415389 }
416390 } else {
417- log .debug ("Unsubscribing from Sentinel at {}:{}" , host , port );
391+ LOG .debug ("Unsubscribing from Sentinel at {}:{}" , host , port );
418392 }
419393 } finally {
420394 if (j != null ) {
@@ -426,14 +400,14 @@ public void onMessage(String channel, String message) {
426400
427401 public void shutdown () {
428402 try {
429- log .debug ("Shutting down listener on {}:{}" , host , port );
403+ LOG .debug ("Shutting down listener on {}:{}" , host , port );
430404 running .set (false );
431405 // This isn't good, the Jedis object is not thread safe
432406 if (j != null ) {
433- j .disconnect ();
407+ j .close ();
434408 }
435409 } catch (Exception e ) {
436- log .error ("Caught exception while shutting down: " , e );
410+ LOG .error ("Caught exception while shutting down: " , e );
437411 }
438412 }
439413 }
0 commit comments