Skip to content

Commit 321d866

Browse files
committed
ringhash: more e2e tests from c-core
Follow up to #7271 to fix #6072. This adds a dozen more end to end tests. There are tests that I did not port, specifically: - TestRingHash_TransientFailureSkipToAvailableReady was flaky when I ported it, so I removed it while investigating. - TestRingHash_SwitchToLowerPriorityAndThenBack was also flaky, I also removed it while investigating. - TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime, I'm not sure we implement this behavior, and if we do, it's not working the same way as in c-core, where the order of subchannel connection attempts is based on the resolver address order rather than the ring order. I will follow up with fixes for each one of the remaining tests.
1 parent c04b085 commit 321d866

File tree

3 files changed

+1148
-144
lines changed

3 files changed

+1148
-144
lines changed

internal/testutils/blocking_context_dialer.go

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,105 @@ package testutils
2121
import (
2222
"context"
2323
"net"
24+
"sync"
25+
26+
"google.golang.org/grpc/grpclog"
2427
)
2528

29+
var logger = grpclog.Component("testutils")
30+
2631
// BlockingDialer is a dialer that waits for Resume() to be called before
2732
// dialing.
2833
type BlockingDialer struct {
29-
dialer *net.Dialer
30-
blockCh chan struct{}
34+
mu sync.Mutex // protects holds
35+
holds map[string][]*Hold
36+
37+
dialer *net.Dialer
3138
}
3239

3340
// NewBlockingDialer returns a dialer that waits for Resume() to be called
3441
// before dialing.
3542
func NewBlockingDialer() *BlockingDialer {
3643
return &BlockingDialer{
37-
dialer: &net.Dialer{},
38-
blockCh: make(chan struct{}),
44+
dialer: &net.Dialer{},
45+
holds: make(map[string][]*Hold),
3946
}
4047
}
4148

4249
// DialContext implements a context dialer for use with grpc.WithContextDialer
4350
// dial option for a BlockingDialer.
4451
func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
52+
d.mu.Lock()
53+
holds := d.holds[addr]
54+
if len(holds) > 0 {
55+
logger.Info("Intercepted connection attempt to addr %s", addr)
56+
hold := holds[0]
57+
d.holds[addr] = holds[1:]
58+
d.mu.Unlock()
59+
60+
close(hold.waitCh)
61+
select {
62+
case <-hold.blockCh:
63+
if hold.err != nil {
64+
return nil, hold.err
65+
}
66+
return d.dialer.DialContext(ctx, "tcp", addr)
67+
case <-ctx.Done():
68+
return nil, ctx.Err()
69+
}
70+
}
71+
// No hold for this addr.
72+
d.mu.Unlock()
73+
return d.dialer.DialContext(ctx, "tcp", addr)
74+
}
75+
76+
// Hold is a connection hold that blocks the dialer when a connection attempt is
77+
// made to the given addr.
78+
type Hold struct {
79+
dialer *BlockingDialer
80+
blockCh chan error
81+
waitCh chan struct{}
82+
err error
83+
addr string
84+
}
85+
86+
// Hold blocks the dialer when a connection attempt is made to the given addr.
87+
// A hold is valid for exactly one connection attempt. Multiple holds for an
88+
// addr can be added, and they will apply in the order that the connection are
89+
// attempted.
90+
func (d *BlockingDialer) Hold(addr string) *Hold {
91+
d.mu.Lock()
92+
defer d.mu.Unlock()
93+
94+
h := Hold{dialer: d, blockCh: make(chan error), waitCh: make(chan struct{}), addr: addr}
95+
d.holds[addr] = append(d.holds[addr], &h)
96+
return &h
97+
}
98+
99+
// Wait returns a channel that blocks until there is a connection attempt on
100+
// this Hold. Return false if the context has expired, true otherwise.
101+
func (h *Hold) Wait(ctx context.Context) bool {
102+
logger.Infof("Waiting for a connection attempt to addr %s", h.addr)
45103
select {
46-
case <-d.blockCh:
47104
case <-ctx.Done():
48-
return nil, ctx.Err()
105+
return false
106+
case <-h.waitCh:
49107
}
50-
return d.dialer.DialContext(ctx, "tcp", addr)
108+
logger.Infof("Connection attempt started to addr %s", h.addr)
109+
return true
110+
}
111+
112+
// Resume unblocks the dialer for the given addr. If called multiple times on
113+
// the same hold, Resume panics.
114+
func (h *Hold) Resume() {
115+
logger.Infof("Resuming connection attempt to addr %s", h.addr)
116+
close(h.blockCh)
51117
}
52118

53-
// Resume unblocks the dialer. It panics if called more than once.
54-
func (d *BlockingDialer) Resume() {
55-
close(d.blockCh)
119+
// Fail fails the connection attempt. If called multiple times on the same hold,
120+
// Fail panics.
121+
func (h *Hold) Fail(err error) {
122+
logger.Infof("Failing connection attempt to addr %s", h.addr)
123+
h.err = err
124+
close(h.blockCh)
56125
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package testutils
20+
21+
import (
22+
"context"
23+
"errors"
24+
"testing"
25+
"time"
26+
)
27+
28+
const testTimeout = 5 * time.Second
29+
30+
func (s) TestBlockingDialer_NoHold(t *testing.T) {
31+
lis, err := LocalTCPListener()
32+
if err != nil {
33+
t.Fatalf("failed to listen: %v", err)
34+
}
35+
defer lis.Close()
36+
37+
d := NewBlockingDialer()
38+
39+
// This should not block.
40+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
41+
defer cancel()
42+
conn, err := d.DialContext(ctx, lis.Addr().String())
43+
if err != nil {
44+
t.Errorf("unexpected error: %v", err)
45+
}
46+
conn.Close()
47+
}
48+
49+
func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) {
50+
lis, err := LocalTCPListener()
51+
if err != nil {
52+
t.Fatalf("failed to listen: %v", err)
53+
}
54+
defer lis.Close()
55+
56+
d := NewBlockingDialer()
57+
h := d.Hold(lis.Addr().String())
58+
59+
done := make(chan struct{})
60+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
61+
defer cancel()
62+
go func() {
63+
conn, err := d.DialContext(ctx, lis.Addr().String())
64+
if err != nil {
65+
t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err)
66+
}
67+
conn.Close()
68+
done <- struct{}{}
69+
}()
70+
71+
// This should block until the goroutine above is scheduled.
72+
if !h.Wait(ctx) {
73+
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
74+
}
75+
select {
76+
case <-done:
77+
t.Errorf("Expected dialer to be blocked.")
78+
default:
79+
}
80+
81+
h.Resume() // Unblock the above goroutine.
82+
83+
select {
84+
case <-done:
85+
case <-ctx.Done():
86+
t.Errorf("Timeout waiting for connection attempt to resume.")
87+
}
88+
}
89+
90+
func (s) TestBlockingDialer_HoldWaitFail(t *testing.T) {
91+
lis, err := LocalTCPListener()
92+
if err != nil {
93+
t.Fatalf("failed to listen: %v", err)
94+
}
95+
defer lis.Close()
96+
97+
d := NewBlockingDialer()
98+
h := d.Hold(lis.Addr().String())
99+
100+
wantErr := errors.New("test error")
101+
102+
done := make(chan struct{})
103+
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
104+
defer cancel()
105+
go func() {
106+
_, err := d.DialContext(ctx, lis.Addr().String())
107+
if !errors.Is(err, wantErr) {
108+
t.Errorf("BlockingDialer.DialContext() after Fail(): got error %v, want %v", err, wantErr)
109+
}
110+
done <- struct{}{}
111+
}()
112+
113+
if !h.Wait(ctx) {
114+
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
115+
}
116+
select {
117+
case <-done:
118+
t.Errorf("Expected dialer to still be blocked after Wait()")
119+
default:
120+
}
121+
122+
h.Fail(wantErr)
123+
124+
select {
125+
case <-done:
126+
case <-ctx.Done():
127+
t.Errorf("Timeout waiting for connection attempt to fail.")
128+
}
129+
}
130+
131+
func (s) TestBlockingDialer_ContextCanceled(t *testing.T) {
132+
lis, err := LocalTCPListener()
133+
if err != nil {
134+
t.Fatalf("failed to listen: %v", err)
135+
}
136+
defer lis.Close()
137+
138+
d := NewBlockingDialer()
139+
h := d.Hold(lis.Addr().String())
140+
141+
done := make(chan struct{})
142+
testCtx, cancel := context.WithTimeout(context.Background(), testTimeout)
143+
defer cancel()
144+
145+
ctx, cancel := context.WithCancel(testCtx)
146+
defer cancel()
147+
go func() {
148+
_, err := d.DialContext(ctx, lis.Addr().String())
149+
if !errors.Is(err, context.Canceled) {
150+
t.Errorf("BlockingDialer.DialContext() after context cancel: got error %v, want %v", err, context.Canceled)
151+
}
152+
done <- struct{}{}
153+
}()
154+
if !h.Wait(ctx) {
155+
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
156+
}
157+
cancel()
158+
159+
select {
160+
case <-done:
161+
case <-testCtx.Done():
162+
t.Errorf("Timeout while waiting for Wait to return.")
163+
}
164+
}
165+
166+
func (s) TestBlockingDialer_CancelWait(t *testing.T) {
167+
lis, err := LocalTCPListener()
168+
if err != nil {
169+
t.Fatalf("failed to listen: %v", err)
170+
}
171+
defer lis.Close()
172+
173+
d := NewBlockingDialer()
174+
h := d.Hold(lis.Addr().String())
175+
176+
testCtx, cancel := context.WithTimeout(context.Background(), testTimeout)
177+
defer cancel()
178+
179+
ctx, cancel := context.WithTimeout(testCtx, 0)
180+
defer cancel()
181+
done := make(chan struct{})
182+
go func() {
183+
if h.Wait(ctx) {
184+
t.Errorf("Expected cancel to return false when context expires")
185+
}
186+
done <- struct{}{}
187+
}()
188+
189+
select {
190+
case <-done:
191+
case <-testCtx.Done():
192+
t.Errorf("Timeout while waiting for Wait to return.")
193+
}
194+
}

0 commit comments

Comments
 (0)