Skip to content

Commit 4ed8180

Browse files
authored
ringhash: more e2e tests from c-core (#7334)
1 parent 61aa949 commit 4ed8180

File tree

3 files changed

+1076
-200
lines changed

3 files changed

+1076
-200
lines changed

internal/testutils/blocking_context_dialer.go

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,109 @@ package testutils
2121
import (
2222
"context"
2323
"net"
24+
"sync"
25+
26+
"google.golang.org/grpc/grpclog"
2427
)
2528

29+
var logger = grpclog.Component("testutils")
30+
2631
// BlockingDialer is a dialer that waits for Resume() to be called before
2732
// dialing.
2833
type BlockingDialer struct {
29-
dialer *net.Dialer
30-
blockCh chan struct{}
34+
// mu protects holds.
35+
mu sync.Mutex
36+
// holds maps network addresses to a list of holds for that address.
37+
holds map[string][]*Hold
3138
}
3239

3340
// NewBlockingDialer returns a dialer that waits for Resume() to be called
3441
// before dialing.
3542
func NewBlockingDialer() *BlockingDialer {
3643
return &BlockingDialer{
37-
dialer: &net.Dialer{},
38-
blockCh: make(chan struct{}),
44+
holds: make(map[string][]*Hold),
3945
}
4046
}
4147

4248
// DialContext implements a context dialer for use with grpc.WithContextDialer
4349
// dial option for a BlockingDialer.
4450
func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
51+
d.mu.Lock()
52+
holds := d.holds[addr]
53+
if len(holds) == 0 {
54+
// No hold for this addr.
55+
d.mu.Unlock()
56+
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
57+
}
58+
hold := holds[0]
59+
d.holds[addr] = holds[1:]
60+
d.mu.Unlock()
61+
62+
logger.Infof("Hold %p: Intercepted connection attempt to addr %q", hold, addr)
63+
close(hold.waitCh)
4564
select {
46-
case <-d.blockCh:
65+
case err := <-hold.blockCh:
66+
if err != nil {
67+
return nil, err
68+
}
69+
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
4770
case <-ctx.Done():
71+
logger.Infof("Hold %p: Connection attempt to addr %q timed out", hold, addr)
4872
return nil, ctx.Err()
4973
}
50-
return d.dialer.DialContext(ctx, "tcp", addr)
5174
}
5275

53-
// Resume unblocks the dialer. It panics if called more than once.
54-
func (d *BlockingDialer) Resume() {
55-
close(d.blockCh)
76+
// Hold is a handle to a single connection attempt. It can be used to block,
77+
// fail and succeed connection attempts.
78+
type Hold struct {
79+
// dialer is the dialer that created this hold.
80+
dialer *BlockingDialer
81+
// waitCh is closed when a connection attempt is received.
82+
waitCh chan struct{}
83+
// blockCh receives the value to return from DialContext for this connection
84+
// attempt (nil on resume, an error on fail). It receives at most 1 value.
85+
blockCh chan error
86+
// addr is the address that this hold is for.
87+
addr string
88+
}
89+
90+
// Hold blocks the dialer when a connection attempt is made to the given addr.
91+
// A hold is valid for exactly one connection attempt. Multiple holds for an
92+
// addr can be added, and they will apply in the order that the connections are
93+
// attempted.
94+
func (d *BlockingDialer) Hold(addr string) *Hold {
95+
d.mu.Lock()
96+
defer d.mu.Unlock()
97+
98+
h := Hold{dialer: d, blockCh: make(chan error, 1), waitCh: make(chan struct{}), addr: addr}
99+
d.holds[addr] = append(d.holds[addr], &h)
100+
return &h
101+
}
102+
103+
// Wait blocks until there is a connection attempt on this Hold, or the context
104+
// expires. Return false if the context has expired, true otherwise.
105+
func (h *Hold) Wait(ctx context.Context) bool {
106+
logger.Infof("Hold %p: Waiting for a connection attempt to addr %q", h, h.addr)
107+
select {
108+
case <-ctx.Done():
109+
return false
110+
case <-h.waitCh:
111+
return true
112+
}
113+
}
114+
115+
// Resume unblocks the dialer for the given addr. Either Resume or Fail must be
116+
// called at most once on a hold. Otherwise, Resume panics.
117+
func (h *Hold) Resume() {
118+
logger.Infof("Hold %p: Resuming connection attempt to addr %q", h, h.addr)
119+
h.blockCh <- nil
120+
close(h.blockCh)
121+
}
122+
123+
// Fail fails the connection attempt. Either Resume or Fail must be
124+
// called at most once on a hold. Otherwise, Resume panics.
125+
func (h *Hold) Fail(err error) {
126+
logger.Infof("Hold %p: Failing connection attempt to addr %q", h, h.addr)
127+
h.blockCh <- err
128+
close(h.blockCh)
56129
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package testutils
20+
21+
import (
22+
"context"
23+
"errors"
24+
"testing"
25+
"time"
26+
)
27+
28+
const (
29+
testTimeout = 5 * time.Second
30+
testShortTimeout = 10 * time.Millisecond
31+
)
32+
33+
func (s) TestBlockingDialer_NoHold(t *testing.T) {
34+
lis, err := LocalTCPListener()
35+
if err != nil {
36+
t.Fatalf("Failed to listen: %v", err)
37+
}
38+
defer lis.Close()
39+
40+
d := NewBlockingDialer()
41+
42+
// This should not block.
43+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
44+
defer cancel()
45+
conn, err := d.DialContext(ctx, lis.Addr().String())
46+
if err != nil {
47+
t.Fatalf("Failed to dial: %v", err)
48+
}
49+
conn.Close()
50+
}
51+
52+
func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) {
53+
lis, err := LocalTCPListener()
54+
if err != nil {
55+
t.Fatalf("Failed to listen: %v", err)
56+
}
57+
defer lis.Close()
58+
59+
d := NewBlockingDialer()
60+
h := d.Hold(lis.Addr().String())
61+
62+
done := make(chan struct{})
63+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
64+
defer cancel()
65+
go func() {
66+
defer close(done)
67+
conn, err := d.DialContext(ctx, lis.Addr().String())
68+
if err != nil {
69+
t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err)
70+
return
71+
}
72+
conn.Close()
73+
}()
74+
75+
// This should block until the goroutine above is scheduled.
76+
if !h.Wait(ctx) {
77+
t.Fatalf("Timeout while waiting for a connection attempt to %q", h.addr)
78+
}
79+
select {
80+
case <-done:
81+
t.Fatalf("Expected dialer to be blocked.")
82+
case <-time.After(testShortTimeout):
83+
}
84+
85+
h.Resume() // Unblock the above goroutine.
86+
87+
select {
88+
case <-done:
89+
case <-ctx.Done():
90+
t.Errorf("Timeout waiting for connection attempt to resume.")
91+
}
92+
}
93+
94+
func (s) TestBlockingDialer_HoldWaitFail(t *testing.T) {
95+
lis, err := LocalTCPListener()
96+
if err != nil {
97+
t.Fatalf("Failed to listen: %v", err)
98+
}
99+
defer lis.Close()
100+
101+
d := NewBlockingDialer()
102+
h := d.Hold(lis.Addr().String())
103+
104+
wantErr := errors.New("test error")
105+
106+
dialError := make(chan error)
107+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
108+
defer cancel()
109+
go func() {
110+
_, err := d.DialContext(ctx, lis.Addr().String())
111+
dialError <- err
112+
}()
113+
114+
if !h.Wait(ctx) {
115+
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
116+
}
117+
select {
118+
case err = <-dialError:
119+
t.Errorf("DialContext got unblocked with err %v. Want DialContext to still be blocked after Wait()", err)
120+
case <-time.After(testShortTimeout):
121+
}
122+
123+
h.Fail(wantErr)
124+
125+
select {
126+
case err = <-dialError:
127+
if !errors.Is(err, wantErr) {
128+
t.Errorf("BlockingDialer.DialContext() after Fail(): got error %v, want %v", err, wantErr)
129+
}
130+
case <-ctx.Done():
131+
t.Errorf("Timeout waiting for connection attempt to fail.")
132+
}
133+
}
134+
135+
func (s) TestBlockingDialer_ContextCanceled(t *testing.T) {
136+
lis, err := LocalTCPListener()
137+
if err != nil {
138+
t.Fatalf("Failed to listen: %v", err)
139+
}
140+
defer lis.Close()
141+
142+
d := NewBlockingDialer()
143+
h := d.Hold(lis.Addr().String())
144+
145+
dialErr := make(chan error)
146+
testCtx, cancel := context.WithTimeout(context.Background(), testTimeout)
147+
defer cancel()
148+
149+
ctx, cancel := context.WithCancel(testCtx)
150+
defer cancel()
151+
go func() {
152+
_, err := d.DialContext(ctx, lis.Addr().String())
153+
dialErr <- err
154+
}()
155+
if !h.Wait(testCtx) {
156+
t.Errorf("Timeout while waiting for a connection attempt to %q", h.addr)
157+
}
158+
159+
cancel()
160+
161+
select {
162+
case err = <-dialErr:
163+
if !errors.Is(err, context.Canceled) {
164+
t.Errorf("BlockingDialer.DialContext() after context cancel: got error %v, want %v", err, context.Canceled)
165+
}
166+
case <-testCtx.Done():
167+
t.Errorf("Timeout while waiting for Wait to return.")
168+
}
169+
170+
h.Resume() // noop, just make sure nothing bad happen.
171+
}
172+
173+
func (s) TestBlockingDialer_CancelWait(t *testing.T) {
174+
lis, err := LocalTCPListener()
175+
if err != nil {
176+
t.Fatalf("Failed to listen: %v", err)
177+
}
178+
defer lis.Close()
179+
180+
d := NewBlockingDialer()
181+
h := d.Hold(lis.Addr().String())
182+
183+
testCtx, cancel := context.WithTimeout(context.Background(), testTimeout)
184+
defer cancel()
185+
186+
ctx, cancel := context.WithCancel(testCtx)
187+
cancel()
188+
done := make(chan struct{})
189+
go func() {
190+
if h.Wait(ctx) {
191+
t.Errorf("Expected cancel to return false when context expires")
192+
}
193+
done <- struct{}{}
194+
}()
195+
196+
select {
197+
case <-done:
198+
case <-testCtx.Done():
199+
t.Errorf("Timeout while waiting for Wait to return.")
200+
}
201+
}

0 commit comments

Comments
 (0)