Skip to content

Commit 54720e6

Browse files
committed
simplify pointer passing for interface in cursor
1 parent 0b3b717 commit 54720e6

File tree

3 files changed

+79
-15
lines changed

3 files changed

+79
-15
lines changed

pool/cursor.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package pool
22

3-
import "context"
3+
import (
4+
"context"
5+
"errors"
6+
"reflect"
7+
)
48

59
// Cursor provides synchronous access to async data from pool's response channel
610
type Cursor struct {
@@ -10,7 +14,7 @@ type Cursor struct {
1014

1115
// Next returns next result from the cursor, ok = false on completion.
1216
// Any error saved internally and can be returned by Err call
13-
func (c *Cursor) Next(ctx context.Context, v *interface{}) bool {
17+
func (c *Cursor) Next(ctx context.Context, v interface{}) bool {
1418
for {
1519
select {
1620
case resp, ok := <-c.ch:
@@ -21,7 +25,14 @@ func (c *Cursor) Next(ctx context.Context, v *interface{}) bool {
2125
c.err = resp.err
2226
continue
2327
}
24-
*v = resp.value
28+
29+
rv := reflect.ValueOf(v)
30+
if rv.Kind() != reflect.Ptr || rv.IsNil() {
31+
c.err = errors.New("value type is not pointer")
32+
return false
33+
}
34+
dstValue := reflect.Indirect(rv)
35+
dstValue.Set(reflect.ValueOf(resp.value))
2536
return ok
2637
case <-ctx.Done():
2738
c.err = ctx.Err()

pool/cursor_test.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,41 @@ import (
1010
)
1111

1212
func TestCursor_Next(t *testing.T) {
13-
c := Cursor{ch: make(chan response, 3)}
13+
14+
type structExample struct {
15+
k1 int
16+
k2 string
17+
k3 []bool
18+
}
19+
20+
c := Cursor{ch: make(chan response, 4)}
1421

1522
c.ch <- response{value: "12345"}
1623
c.ch <- response{value: "abc"}
17-
c.ch <- response{value: "xyz 0987"}
24+
c.ch <- response{value: 1234.567}
25+
c.ch <- response{value: structExample{k1: 12, k2: "abcd", k3: []bool{true, false}}}
1826
close(c.ch)
1927

20-
var v interface{}
21-
next := c.Next(context.Background(), &v)
28+
var s string
29+
next := c.Next(context.Background(), &s)
30+
assert.True(t, next)
31+
assert.Equal(t, "12345", s)
32+
33+
next = c.Next(context.Background(), &s)
2234
assert.True(t, next)
23-
assert.Equal(t, "12345", v.(string))
35+
assert.Equal(t, "abc", s)
2436

25-
next = c.Next(context.Background(), &v)
37+
var f float64
38+
next = c.Next(context.Background(), &f)
2639
assert.True(t, next)
27-
assert.Equal(t, "abc", v.(string))
40+
assert.Equal(t, 1234.567, f)
2841

29-
next = c.Next(context.Background(), &v)
42+
var ss structExample
43+
next = c.Next(context.Background(), &ss)
3044
assert.True(t, next)
31-
assert.Equal(t, "xyz 0987", v.(string))
45+
assert.Equal(t, structExample{k1: 12, k2: "abcd", k3: []bool{true, false}}, ss)
3246

33-
next = c.Next(context.Background(), &v)
47+
next = c.Next(context.Background(), nil)
3448
assert.False(t, next)
3549
}
3650

pool/pool_test.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,45 @@ func TestPool(t *testing.T) {
119119
}
120120
}
121121

122+
func TestPoolWithStruct(t *testing.T) {
123+
124+
type resp struct {
125+
k1 string
126+
k2 int
127+
k3 string
128+
k4 []string
129+
k5 bool
130+
}
131+
132+
p := New(4, func(ctx context.Context, inpRec interface{}, sender SenderFn, store WorkerStore) error {
133+
i := inpRec.(int)
134+
r := resp{k1: "rec" + strconv.Itoa(i), k2: i, k3: "something", k4: []string{"foo", "bar"}, k5: true}
135+
return sender(r)
136+
})
137+
138+
go func() {
139+
for i := 0; i < 1000; i++ {
140+
p.Submit(i)
141+
time.Sleep(time.Millisecond * time.Duration(rand.Intn(3))) //nolint gosec
142+
}
143+
p.Close()
144+
}()
145+
c, err := p.Go(context.Background())
146+
require.NoError(t, err)
147+
148+
var recs []resp
149+
var v resp
150+
for c.Next(context.Background(), &v) {
151+
recs = append(recs, v)
152+
}
153+
require.NoError(t, c.Err())
154+
assert.Equal(t, 1000, len(recs))
155+
156+
sort.Slice(recs, func(i, j int) bool { return recs[i].k2 < recs[j].k2 })
157+
assert.Equal(t, resp{k1: "rec0", k2: 0, k3: "something", k4: []string{"foo", "bar"}, k5: true}, recs[0])
158+
assert.Equal(t, resp{k1: "rec999", k2: 999, k3: "something", k4: []string{"foo", "bar"}, k5: true}, recs[999])
159+
}
160+
122161
func TestPoolWithStore(t *testing.T) {
123162

124163
worker := func(ctx context.Context, v interface{}, send SenderFn, store WorkerStore) error {
@@ -393,9 +432,9 @@ func ExampleWorkers_withOptions() {
393432
}()
394433

395434
// consume results in streaming mode
396-
var v interface{}
435+
var v string
397436
for cursor.Next(context.TODO(), &v) {
398-
log.Printf("%v", v)
437+
log.Printf("%s", v)
399438
}
400439

401440
// show metrics

0 commit comments

Comments
 (0)