@@ -71,11 +71,8 @@ type ClusterOptions struct {
71
71
WriteTimeout time.Duration
72
72
ContextTimeoutEnabled bool
73
73
74
- // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
75
- PoolFIFO bool
76
-
77
- // PoolSize applies per cluster node and not for the whole cluster.
78
- PoolSize int
74
+ PoolFIFO bool
75
+ PoolSize int // applies per cluster node and not for the whole cluster
79
76
PoolTimeout time.Duration
80
77
MinIdleConns int
81
78
MaxIdleConns int
@@ -391,6 +388,7 @@ type clusterNodes struct {
391
388
nodes map [string ]* clusterNode
392
389
activeAddrs []string
393
390
closed bool
391
+ onNewNode []func (rdb * Client )
394
392
395
393
_generation uint32 // atomic
396
394
}
@@ -426,6 +424,12 @@ func (c *clusterNodes) Close() error {
426
424
return firstErr
427
425
}
428
426
427
+ func (c * clusterNodes ) OnNewNode (fn func (rdb * Client )) {
428
+ c .mu .Lock ()
429
+ c .onNewNode = append (c .onNewNode , fn )
430
+ c .mu .Unlock ()
431
+ }
432
+
429
433
func (c * clusterNodes ) Addrs () ([]string , error ) {
430
434
var addrs []string
431
435
@@ -503,6 +507,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
503
507
}
504
508
505
509
node = newClusterNode (c .opt , addr )
510
+ for _ , fn := range c .onNewNode {
511
+ fn (node .Client )
512
+ }
506
513
507
514
c .addrs = appendIfNotExists (c .addrs , addr )
508
515
c .nodes [addr ] = node
@@ -812,18 +819,14 @@ func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, er
812
819
813
820
//------------------------------------------------------------------------------
814
821
815
- type clusterClient struct {
816
- opt * ClusterOptions
817
- nodes * clusterNodes
818
- state * clusterStateHolder //nolint:structcheck
819
- cmdsInfoCache * cmdsInfoCache //nolint:structcheck
820
- }
821
-
822
822
// ClusterClient is a Redis Cluster client representing a pool of zero
823
823
// or more underlying connections. It's safe for concurrent use by
824
824
// multiple goroutines.
825
825
type ClusterClient struct {
826
- * clusterClient
826
+ opt * ClusterOptions
827
+ nodes * clusterNodes
828
+ state * clusterStateHolder
829
+ cmdsInfoCache * cmdsInfoCache
827
830
cmdable
828
831
hooks
829
832
}
@@ -834,15 +837,18 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
834
837
opt .init ()
835
838
836
839
c := & ClusterClient {
837
- clusterClient : & clusterClient {
838
- opt : opt ,
839
- nodes : newClusterNodes (opt ),
840
- },
840
+ opt : opt ,
841
+ nodes : newClusterNodes (opt ),
841
842
}
843
+
842
844
c .state = newClusterStateHolder (c .loadState )
843
845
c .cmdsInfoCache = newCmdsInfoCache (c .cmdsInfo )
844
846
c .cmdable = c .Process
845
847
848
+ c .hooks .process = c .process
849
+ c .hooks .processPipeline = c ._processPipeline
850
+ c .hooks .processTxPipeline = c ._processTxPipeline
851
+
846
852
return c
847
853
}
848
854
@@ -873,13 +879,14 @@ func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
873
879
}
874
880
875
881
func (c * ClusterClient ) Process (ctx context.Context , cmd Cmder ) error {
876
- return c .hooks .process (ctx , cmd , c .process )
882
+ err := c .hooks .process (ctx , cmd )
883
+ cmd .SetErr (err )
884
+ return err
877
885
}
878
886
879
887
func (c * ClusterClient ) process (ctx context.Context , cmd Cmder ) error {
880
888
cmdInfo := c .cmdInfo (ctx , cmd .Name ())
881
889
slot := c .cmdSlot (ctx , cmd )
882
-
883
890
var node * clusterNode
884
891
var ask bool
885
892
var lastErr error
@@ -899,11 +906,12 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
899
906
}
900
907
901
908
if ask {
909
+ ask = false
910
+
902
911
pipe := node .Client .Pipeline ()
903
912
_ = pipe .Process (ctx , NewCmd (ctx , "asking" ))
904
913
_ = pipe .Process (ctx , cmd )
905
914
_ , lastErr = pipe .Exec (ctx )
906
- ask = false
907
915
} else {
908
916
lastErr = node .Client .Process (ctx , cmd )
909
917
}
@@ -958,6 +966,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
958
966
return lastErr
959
967
}
960
968
969
+ func (c * ClusterClient ) OnNewNode (fn func (rdb * Client )) {
970
+ c .nodes .OnNewNode (fn )
971
+ }
972
+
961
973
// ForEachMaster concurrently calls the fn on each master node in the cluster.
962
974
// It returns the first error if any.
963
975
func (c * ClusterClient ) ForEachMaster (
@@ -1165,7 +1177,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
1165
1177
1166
1178
func (c * ClusterClient ) Pipeline () Pipeliner {
1167
1179
pipe := Pipeline {
1168
- exec : c . processPipeline ,
1180
+ exec : pipelineExecer ( c . hooks . processPipeline ) ,
1169
1181
}
1170
1182
pipe .init ()
1171
1183
return & pipe
@@ -1175,10 +1187,6 @@ func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error)
1175
1187
return c .Pipeline ().Pipelined (ctx , fn )
1176
1188
}
1177
1189
1178
- func (c * ClusterClient ) processPipeline (ctx context.Context , cmds []Cmder ) error {
1179
- return c .hooks .processPipeline (ctx , cmds , c ._processPipeline )
1180
- }
1181
-
1182
1190
func (c * ClusterClient ) _processPipeline (ctx context.Context , cmds []Cmder ) error {
1183
1191
cmdsMap := newCmdsMap ()
1184
1192
@@ -1258,7 +1266,7 @@ func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool
1258
1266
func (c * ClusterClient ) _processPipelineNode (
1259
1267
ctx context.Context , node * clusterNode , cmds []Cmder , failedCmds * cmdsMap ,
1260
1268
) {
1261
- _ = node .Client .hooks .processPipeline (ctx , cmds , func (ctx context.Context , cmds []Cmder ) error {
1269
+ _ = node .Client .hooks .withProcessPipelineHook (ctx , cmds , func (ctx context.Context , cmds []Cmder ) error {
1262
1270
return node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1263
1271
if err := cn .WithWriter (c .context (ctx ), c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1264
1272
return writeCmds (wr , cmds )
@@ -1344,7 +1352,10 @@ func (c *ClusterClient) checkMovedErr(
1344
1352
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1345
1353
func (c * ClusterClient ) TxPipeline () Pipeliner {
1346
1354
pipe := Pipeline {
1347
- exec : c .processTxPipeline ,
1355
+ exec : func (ctx context.Context , cmds []Cmder ) error {
1356
+ cmds = wrapMultiExec (ctx , cmds )
1357
+ return c .hooks .processTxPipeline (ctx , cmds )
1358
+ },
1348
1359
}
1349
1360
pipe .init ()
1350
1361
return & pipe
@@ -1354,10 +1365,6 @@ func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) erro
1354
1365
return c .TxPipeline ().Pipelined (ctx , fn )
1355
1366
}
1356
1367
1357
- func (c * ClusterClient ) processTxPipeline (ctx context.Context , cmds []Cmder ) error {
1358
- return c .hooks .processTxPipeline (ctx , cmds , c ._processTxPipeline )
1359
- }
1360
-
1361
1368
func (c * ClusterClient ) _processTxPipeline (ctx context.Context , cmds []Cmder ) error {
1362
1369
// Trim multi .. exec.
1363
1370
cmds = cmds [1 : len (cmds )- 1 ]
@@ -1419,38 +1426,38 @@ func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int
1419
1426
func (c * ClusterClient ) _processTxPipelineNode (
1420
1427
ctx context.Context , node * clusterNode , cmds []Cmder , failedCmds * cmdsMap ,
1421
1428
) {
1422
- _ = node .Client .hooks .processTxPipeline (
1423
- ctx , cmds , func (ctx context.Context , cmds []Cmder ) error {
1424
- return node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1425
- if err := cn .WithWriter (c .context (ctx ), c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1426
- return writeCmds (wr , cmds )
1427
- }); err != nil {
1428
- setCmdsErr (cmds , err )
1429
- return err
1430
- }
1431
-
1432
- return cn .WithReader (c .context (ctx ), c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1433
- statusCmd := cmds [0 ].(* StatusCmd )
1434
- // Trim multi and exec.
1435
- trimmedCmds := cmds [1 : len (cmds )- 1 ]
1429
+ cmds = wrapMultiExec (ctx , cmds )
1430
+ _ = node .Client .hooks .withProcessPipelineHook (ctx , cmds , func (ctx context.Context , cmds []Cmder ) error {
1431
+ return node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1432
+ if err := cn .WithWriter (c .context (ctx ), c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1433
+ return writeCmds (wr , cmds )
1434
+ }); err != nil {
1435
+ setCmdsErr (cmds , err )
1436
+ return err
1437
+ }
1436
1438
1437
- if err := c . txPipelineReadQueued (
1438
- ctx , rd , statusCmd , trimmedCmds , failedCmds ,
1439
- ); err != nil {
1440
- setCmdsErr ( cmds , err )
1439
+ return cn . WithReader ( c . context ( ctx ), c . opt . ReadTimeout , func ( rd * proto. Reader ) error {
1440
+ statusCmd := cmds [ 0 ].( * StatusCmd )
1441
+ // Trim multi and exec.
1442
+ trimmedCmds := cmds [ 1 : len ( cmds ) - 1 ]
1441
1443
1442
- moved , ask , addr := isMovedError ( err )
1443
- if moved || ask {
1444
- return c . cmdsMoved ( ctx , trimmedCmds , moved , ask , addr , failedCmds )
1445
- }
1444
+ if err := c . txPipelineReadQueued (
1445
+ ctx , rd , statusCmd , trimmedCmds , failedCmds ,
1446
+ ); err != nil {
1447
+ setCmdsErr ( cmds , err )
1446
1448
1447
- return err
1449
+ moved , ask , addr := isMovedError (err )
1450
+ if moved || ask {
1451
+ return c .cmdsMoved (ctx , trimmedCmds , moved , ask , addr , failedCmds )
1448
1452
}
1449
1453
1450
- return pipelineReadCmds (rd , trimmedCmds )
1451
- })
1454
+ return err
1455
+ }
1456
+
1457
+ return pipelineReadCmds (rd , trimmedCmds )
1452
1458
})
1453
1459
})
1460
+ })
1454
1461
}
1455
1462
1456
1463
func (c * ClusterClient ) txPipelineReadQueued (
@@ -1742,7 +1749,7 @@ func (c *ClusterClient) cmdNode(
1742
1749
return state .slotMasterNode (slot )
1743
1750
}
1744
1751
1745
- func (c * clusterClient ) slotReadOnlyNode (state * clusterState , slot int ) (* clusterNode , error ) {
1752
+ func (c * ClusterClient ) slotReadOnlyNode (state * clusterState , slot int ) (* clusterNode , error ) {
1746
1753
if c .opt .RouteByLatency {
1747
1754
return state .slotClosestNode (slot )
1748
1755
}
0 commit comments