1
1
package org .casbin .watcher .lettuce ;
2
2
3
- import io .lettuce .core .AbstractRedisClient ;
4
- import io .lettuce .core .ClientOptions ;
5
- import io .lettuce .core .RedisClient ;
6
- import io .lettuce .core .RedisURI ;
3
+ import io .lettuce .core .*;
7
4
import io .lettuce .core .cluster .ClusterClientOptions ;
5
+ import io .lettuce .core .cluster .ClusterTopologyRefreshOptions ;
8
6
import io .lettuce .core .cluster .RedisClusterClient ;
7
+ import io .lettuce .core .cluster .RedisClusterURIUtil ;
9
8
import io .lettuce .core .pubsub .StatefulRedisPubSubConnection ;
10
9
import io .lettuce .core .resource .ClientResources ;
11
10
import io .lettuce .core .resource .DefaultClientResources ;
12
11
import org .apache .commons .lang3 .StringUtils ;
13
12
import org .casbin .jcasbin .persist .Watcher ;
14
13
import org .casbin .watcher .lettuce .constants .WatcherConstant ;
14
+ import org .slf4j .Logger ;
15
+ import org .slf4j .LoggerFactory ;
15
16
17
+ import java .net .URI ;
16
18
import java .time .Duration ;
17
19
import java .time .temporal .ChronoUnit ;
20
+ import java .util .List ;
18
21
import java .util .UUID ;
19
22
import java .util .function .Consumer ;
20
23
21
24
public class LettuceRedisWatcher implements Watcher {
25
+ private static final Logger logger = LoggerFactory .getLogger (LettuceRedisWatcher .class );
22
26
private final String localId ;
23
27
private final String redisChannelName ;
24
28
private final AbstractRedisClient abstractRedisClient ;
@@ -33,10 +37,9 @@ public class LettuceRedisWatcher implements Watcher {
33
37
* @param redisChannelName Redis Channel
34
38
* @param timeout Redis Timeout
35
39
* @param password Redis Password
36
- * @param type Redis Type (standalone | cluster)
37
40
*/
38
- public LettuceRedisWatcher (String redisIp , int redisPort , String redisChannelName , int timeout , String password , String type ) {
39
- this .abstractRedisClient = this .getLettuceRedisClient (redisIp , redisPort , password , timeout , type );
41
+ public LettuceRedisWatcher (String redisIp , Integer redisPort , String redisChannelName , int timeout , String password ) {
42
+ this .abstractRedisClient = this .getLettuceRedisClient (redisIp , redisPort , null , password , timeout , WatcherConstant . LETTUCE_REDIS_TYPE_STANDALONE );
40
43
this .localId = UUID .randomUUID ().toString ();
41
44
this .redisChannelName = redisChannelName ;
42
45
this .startSub ();
@@ -48,10 +51,24 @@ public LettuceRedisWatcher(String redisIp, int redisPort, String redisChannelNam
48
51
* @param redisIp Redis IP
49
52
* @param redisPort Redis Port
50
53
* @param redisChannelName Redis Channel
51
- * @param type Redis Type (standalone | cluster)
52
54
*/
53
- public LettuceRedisWatcher (String redisIp , int redisPort , String redisChannelName , String type ) {
54
- this (redisIp , redisPort , redisChannelName , 2000 , null , type );
55
+ public LettuceRedisWatcher (String redisIp , Integer redisPort , String redisChannelName ) {
56
+ this (redisIp , redisPort , redisChannelName , 2000 , null );
57
+ }
58
+
59
+ /**
60
+ * Constructor
61
+ *
62
+ * @param nodes Redis Nodes
63
+ * @param redisChannelName Redis Channel
64
+ * @param timeout Redis Timeout
65
+ * @param password Redis Password
66
+ */
67
+ public LettuceRedisWatcher (String nodes , String redisChannelName , Integer timeout , String password ) {
68
+ this .abstractRedisClient = this .getLettuceRedisClient (null , null , nodes , password , timeout , WatcherConstant .LETTUCE_REDIS_TYPE_CLUSTER );
69
+ this .localId = UUID .randomUUID ().toString ();
70
+ this .redisChannelName = redisChannelName ;
71
+ this .startSub ();
55
72
}
56
73
57
74
@ Override
@@ -86,37 +103,38 @@ private void startSub() {
86
103
*
87
104
* @param host Redis Host
88
105
* @param port Redis Port
106
+ * @param nodes Redis Nodes
89
107
* @param password Redis Password
90
108
* @param timeout Redis Timeout
91
109
* @param type Redis Type (standalone | cluster) default:standalone
92
110
* @return AbstractRedisClient
93
111
*/
94
- private AbstractRedisClient getLettuceRedisClient (String host , int port , String password , int timeout , String type ) {
112
+ private AbstractRedisClient getLettuceRedisClient (String host , Integer port , String nodes , String password , int timeout , String type ) {
95
113
// todo default standalone ?
96
114
// type = StringUtils.isEmpty(type) ? WatcherConstant.LETTUCE_REDIS_TYPE_STANDALONE : type;
97
115
if (StringUtils .isNotEmpty (type ) && StringUtils .equalsAnyIgnoreCase (type ,
98
116
WatcherConstant .LETTUCE_REDIS_TYPE_STANDALONE , WatcherConstant .LETTUCE_REDIS_TYPE_CLUSTER )) {
99
- RedisURI redisUri = null ;
100
- if (StringUtils .isNotEmpty (password )) {
101
- redisUri = RedisURI .builder ()
102
- .withHost (host )
103
- .withPort (port )
104
- .withPassword (password .toCharArray ())
105
- .withTimeout (Duration .of (timeout , ChronoUnit .SECONDS ))
106
- .build ();
107
- } else {
108
- redisUri = RedisURI .builder ()
109
- .withHost (host )
110
- .withPort (port )
111
- .withTimeout (Duration .of (timeout , ChronoUnit .SECONDS ))
112
- .build ();
113
- }
114
117
ClientResources clientResources = DefaultClientResources .builder ()
115
118
.ioThreadPoolSize (4 )
116
119
.computationThreadPoolSize (4 )
117
120
.build ();
118
121
if (StringUtils .equalsIgnoreCase (type , WatcherConstant .LETTUCE_REDIS_TYPE_STANDALONE )) {
119
122
// standalone
123
+ RedisURI redisUri = null ;
124
+ if (StringUtils .isNotEmpty (password )) {
125
+ redisUri = RedisURI .builder ()
126
+ .withHost (host )
127
+ .withPort (port )
128
+ .withPassword (password .toCharArray ())
129
+ .withTimeout (Duration .of (timeout , ChronoUnit .SECONDS ))
130
+ .build ();
131
+ } else {
132
+ redisUri = RedisURI .builder ()
133
+ .withHost (host )
134
+ .withPort (port )
135
+ .withTimeout (Duration .of (timeout , ChronoUnit .SECONDS ))
136
+ .build ();
137
+ }
120
138
ClientOptions clientOptions = ClientOptions .builder ()
121
139
.autoReconnect (true )
122
140
.pingBeforeActivateConnection (true )
@@ -126,12 +144,26 @@ private AbstractRedisClient getLettuceRedisClient(String host, int port, String
126
144
return redisClient ;
127
145
} else {
128
146
// cluster
147
+ TimeoutOptions timeoutOptions = TimeoutOptions .builder ().fixedTimeout (Duration .of (timeout , ChronoUnit .SECONDS )).build ();
148
+ ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions .builder ()
149
+ .enablePeriodicRefresh (Duration .of (10 , ChronoUnit .MINUTES ))
150
+ .enableAdaptiveRefreshTrigger (ClusterTopologyRefreshOptions .RefreshTrigger .MOVED_REDIRECT , ClusterTopologyRefreshOptions .RefreshTrigger .PERSISTENT_RECONNECTS )
151
+ .adaptiveRefreshTriggersTimeout (Duration .of (30 , ChronoUnit .SECONDS ))
152
+ .build ();
129
153
ClusterClientOptions clusterClientOptions = ClusterClientOptions .builder ()
130
154
.autoReconnect (true )
155
+ .timeoutOptions (timeoutOptions )
156
+ .topologyRefreshOptions (topologyRefreshOptions )
131
157
.pingBeforeActivateConnection (true )
132
158
.validateClusterNodeMembership (true )
133
159
.build ();
134
- RedisClusterClient redisClusterClient = RedisClusterClient .create (clientResources , redisUri );
160
+ // Redis Cluster Node
161
+ String redisUri = StringUtils .isNotEmpty (password ) ?
162
+ WatcherConstant .REDIS_URI_PREFIX .concat (password ).concat (WatcherConstant .REDIS_URI_PASSWORD_SPLIT ).concat (nodes ) :
163
+ WatcherConstant .REDIS_URI_PREFIX .concat (nodes );
164
+ logger .info ("Redis Cluster Uri: {}" , redisUri );
165
+ List <RedisURI > redisURIList = RedisClusterURIUtil .toRedisURIs (URI .create (redisUri ));
166
+ RedisClusterClient redisClusterClient = RedisClusterClient .create (clientResources , redisURIList );
135
167
redisClusterClient .setOptions (clusterClientOptions );
136
168
return redisClusterClient ;
137
169
}
0 commit comments