Skip to content

Commit 4568d8f

Browse files
committed
Merge branch 'master' of https://github.com/redis/go-redis into os-add-docstring-to-tsmadd
2 parents f45f9ad + 277e8b7 commit 4568d8f

File tree

11 files changed

+224
-8
lines changed

11 files changed

+224
-8
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828

2929
steps:
3030
- name: Set up ${{ matrix.go-version }}
31-
uses: actions/setup-go@v4
31+
uses: actions/setup-go@v5
3232
with:
3333
go-version: ${{ matrix.go-version }}
3434

.github/workflows/doctests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929

3030
steps:
3131
- name: Set up ${{ matrix.go-version }}
32-
uses: actions/setup-go@v4
32+
uses: actions/setup-go@v5
3333
with:
3434
go-version: ${{ matrix.go-version }}
3535

.github/workflows/spellcheck.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
- name: Checkout
99
uses: actions/checkout@v4
1010
- name: Check Spelling
11-
uses: rojopolis/spellcheck-github-actions@0.34.0
11+
uses: rojopolis/spellcheck-github-actions@0.35.0
1212
with:
1313
config_path: .github/spellcheck-settings.yml
1414
task_name: Markdown

.github/workflows/stale-issues.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212

1313
runs-on: ubuntu-latest
1414
steps:
15-
- uses: actions/stale@v8
15+
- uses: actions/stale@v9
1616
with:
1717
repo-token: ${{ secrets.GITHUB_TOKEN }}
1818
stale-issue-message: 'This issue is marked stale. It will be closed in 30 days if it is not updated.'

command.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"regexp"
99
"strconv"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/redis/go-redis/v9/internal"
@@ -5381,3 +5382,85 @@ func (cmd *InfoCmd) Item(section, key string) string {
53815382
return cmd.val[section][key]
53825383
}
53835384
}
5385+
5386+
type MonitorStatus int
5387+
5388+
const (
5389+
monitorStatusIdle MonitorStatus = iota
5390+
monitorStatusStart
5391+
monitorStatusStop
5392+
)
5393+
5394+
type MonitorCmd struct {
5395+
baseCmd
5396+
ch chan string
5397+
status MonitorStatus
5398+
mu sync.Mutex
5399+
}
5400+
5401+
func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd {
5402+
return &MonitorCmd{
5403+
baseCmd: baseCmd{
5404+
ctx: ctx,
5405+
args: []interface{}{"monitor"},
5406+
},
5407+
ch: ch,
5408+
status: monitorStatusIdle,
5409+
mu: sync.Mutex{},
5410+
}
5411+
}
5412+
5413+
func (cmd *MonitorCmd) String() string {
5414+
return cmdString(cmd, nil)
5415+
}
5416+
5417+
func (cmd *MonitorCmd) readReply(rd *proto.Reader) error {
5418+
ctx, cancel := context.WithCancel(cmd.ctx)
5419+
go func(ctx context.Context) {
5420+
for {
5421+
select {
5422+
case <-ctx.Done():
5423+
return
5424+
default:
5425+
err := cmd.readMonitor(rd, cancel)
5426+
if err != nil {
5427+
cmd.err = err
5428+
return
5429+
}
5430+
}
5431+
}
5432+
}(ctx)
5433+
return nil
5434+
}
5435+
5436+
func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error {
5437+
for {
5438+
cmd.mu.Lock()
5439+
st := cmd.status
5440+
cmd.mu.Unlock()
5441+
if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart {
5442+
line, err := rd.ReadString()
5443+
if err != nil {
5444+
return err
5445+
}
5446+
cmd.ch <- line
5447+
}
5448+
if st == monitorStatusStop {
5449+
cancel()
5450+
break
5451+
}
5452+
}
5453+
return nil
5454+
}
5455+
5456+
func (cmd *MonitorCmd) Start() {
5457+
cmd.mu.Lock()
5458+
defer cmd.mu.Unlock()
5459+
cmd.status = monitorStatusStart
5460+
}
5461+
5462+
func (cmd *MonitorCmd) Stop() {
5463+
cmd.mu.Lock()
5464+
defer cmd.mu.Unlock()
5465+
cmd.status = monitorStatusStop
5466+
}

commands.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ type Cmdable interface {
204204
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
205205
Time(ctx context.Context) *TimeCmd
206206
DebugObject(ctx context.Context, key string) *StringCmd
207-
208207
MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd
209208

210209
ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
@@ -700,3 +699,20 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St
700699
_ = c(ctx, cmd)
701700
return cmd
702701
}
702+
703+
/*
704+
Monitor - represents a Redis MONITOR command, allowing the user to capture
705+
and process all commands sent to a Redis server. This mimics the behavior of
706+
MONITOR in the redis-cli.
707+
708+
Notes:
709+
- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection
710+
- The user should create a channel of type string
711+
- This runs concurrently in the background. Trigger via the Start and Stop functions
712+
See further: Redis MONITOR command: https://redis.io/commands/monitor
713+
*/
714+
func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd {
715+
cmd := newMonitorCmd(ctx, ch)
716+
_ = c(ctx, cmd)
717+
return cmd
718+
}

internal/proto/writer_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ var _ = Describe("WriteArg", func() {
112112
})
113113

114114
args := map[any]string{
115-
"hello": "$1\r\nhello\r\n",
115+
"hello": "$5\r\nhello\r\n",
116116
int(10): "$2\r\n10\r\n",
117117
util.ToPtr(int(10)): "$2\r\n10\r\n",
118118
int8(10): "$2\r\n10\r\n",
@@ -133,8 +133,8 @@ var _ = Describe("WriteArg", func() {
133133
util.ToPtr(uint32(10)): "$2\r\n10\r\n",
134134
uint64(10): "$2\r\n10\r\n",
135135
util.ToPtr(uint64(10)): "$2\r\n10\r\n",
136-
float32(10.3): "$4\r\n10.3\r\n",
137-
util.ToPtr(float32(10.3)): "$4\r\n10.3\r\n",
136+
float32(10.3): "$18\r\n10.300000190734863\r\n",
137+
util.ToPtr(float32(10.3)): "$18\r\n10.300000190734863\r\n",
138138
float64(10.3): "$4\r\n10.3\r\n",
139139
util.ToPtr(float64(10.3)): "$4\r\n10.3\r\n",
140140
bool(true): "$1\r\n1\r\n",
@@ -144,6 +144,7 @@ var _ = Describe("WriteArg", func() {
144144
}
145145

146146
for arg, expect := range args {
147+
arg, expect := arg, expect
147148
It(fmt.Sprintf("should write arg of type %T", arg), func() {
148149
err := wr.WriteArg(arg)
149150
Expect(err).NotTo(HaveOccurred())

main_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ var (
4141
redisAddr = ":" + redisPort
4242
)
4343

44+
var (
45+
rediStackPort = "6379"
46+
rediStackAddr = ":" + rediStackPort
47+
)
48+
4449
var (
4550
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
4651

monitor_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package redis_test
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
. "github.com/bsm/ginkgo/v2"
8+
. "github.com/bsm/gomega"
9+
10+
"github.com/redis/go-redis/v9"
11+
)
12+
13+
var _ = Describe("Monitor command", Label("monitor"), func() {
14+
ctx := context.TODO()
15+
var client *redis.Client
16+
17+
BeforeEach(func() {
18+
client = redis.NewClient(&redis.Options{Addr: ":6379"})
19+
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
20+
})
21+
22+
AfterEach(func() {
23+
Expect(client.Close()).NotTo(HaveOccurred())
24+
})
25+
26+
It("should monitor", Label("monitor"), func() {
27+
ress := make(chan string)
28+
client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr})
29+
mn := client1.Monitor(ctx, ress)
30+
mn.Start()
31+
// Wait for the Redis server to be in monitoring mode.
32+
time.Sleep(100 * time.Millisecond)
33+
client.Set(ctx, "foo", "bar", 0)
34+
client.Set(ctx, "bar", "baz", 0)
35+
client.Set(ctx, "bap", 8, 0)
36+
client.Get(ctx, "bap")
37+
lst := []string{}
38+
for i := 0; i < 5; i++ {
39+
s := <-ress
40+
lst = append(lst, s)
41+
}
42+
mn.Stop()
43+
Expect(lst[0]).To(ContainSubstring("OK"))
44+
Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`))
45+
Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`))
46+
Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`))
47+
})
48+
})

redis.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"net"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -40,12 +41,15 @@ type (
4041
)
4142

4243
type hooksMixin struct {
44+
hooksMu *sync.Mutex
45+
4346
slice []Hook
4447
initial hooks
4548
current hooks
4649
}
4750

4851
func (hs *hooksMixin) initHooks(hooks hooks) {
52+
hs.hooksMu = new(sync.Mutex)
4953
hs.initial = hooks
5054
hs.chain()
5155
}
@@ -116,6 +120,9 @@ func (hs *hooksMixin) AddHook(hook Hook) {
116120
func (hs *hooksMixin) chain() {
117121
hs.initial.setDefaults()
118122

123+
hs.hooksMu.Lock()
124+
defer hs.hooksMu.Unlock()
125+
119126
hs.current.dial = hs.initial.dial
120127
hs.current.process = hs.initial.process
121128
hs.current.pipeline = hs.initial.pipeline
@@ -138,9 +145,13 @@ func (hs *hooksMixin) chain() {
138145
}
139146

140147
func (hs *hooksMixin) clone() hooksMixin {
148+
hs.hooksMu.Lock()
149+
defer hs.hooksMu.Unlock()
150+
141151
clone := *hs
142152
l := len(clone.slice)
143153
clone.slice = clone.slice[:l:l]
154+
clone.hooksMu = new(sync.Mutex)
144155
return clone
145156
}
146157

@@ -165,6 +176,8 @@ func (hs *hooksMixin) withProcessPipelineHook(
165176
}
166177

167178
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
179+
hs.hooksMu.Lock()
180+
defer hs.hooksMu.Unlock()
168181
return hs.current.dial(ctx, network, addr)
169182
}
170183

redis_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,3 +579,53 @@ var _ = Describe("Hook", func() {
579579
Expect(cmd.Val()).To(Equal("Script and hook"))
580580
})
581581
})
582+
583+
var _ = Describe("Hook with MinIdleConns", func() {
584+
var client *redis.Client
585+
586+
BeforeEach(func() {
587+
options := redisOptions()
588+
options.MinIdleConns = 1
589+
client = redis.NewClient(options)
590+
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
591+
})
592+
593+
AfterEach(func() {
594+
err := client.Close()
595+
Expect(err).NotTo(HaveOccurred())
596+
})
597+
598+
It("fifo", func() {
599+
var res []string
600+
client.AddHook(&hook{
601+
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
602+
return func(ctx context.Context, cmd redis.Cmder) error {
603+
res = append(res, "hook-1-process-start")
604+
err := hook(ctx, cmd)
605+
res = append(res, "hook-1-process-end")
606+
return err
607+
}
608+
},
609+
})
610+
client.AddHook(&hook{
611+
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
612+
return func(ctx context.Context, cmd redis.Cmder) error {
613+
res = append(res, "hook-2-process-start")
614+
err := hook(ctx, cmd)
615+
res = append(res, "hook-2-process-end")
616+
return err
617+
}
618+
},
619+
})
620+
621+
err := client.Ping(ctx).Err()
622+
Expect(err).NotTo(HaveOccurred())
623+
624+
Expect(res).To(Equal([]string{
625+
"hook-1-process-start",
626+
"hook-2-process-start",
627+
"hook-2-process-end",
628+
"hook-1-process-end",
629+
}))
630+
})
631+
})

0 commit comments

Comments
 (0)