Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exit when FUSE errors on accept #694

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 73 additions & 10 deletions internal/proxy/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,34 @@ func randTmpDir(t interface {
// newTestClient is a convenience function for testing that creates a
// proxy.Client and starts it. The returned cleanup function is also a
// convenience. Callers may choose to ignore it and manually close the client.
func newTestClient(t *testing.T, d alloydb.Dialer, fuseDir, fuseTempDir string) (*proxy.Client, func()) {
func newTestClient(t *testing.T, d alloydb.Dialer, fuseDir, fuseTempDir string) (*proxy.Client, chan error, func()) {
conf := &proxy.Config{FUSEDir: fuseDir, FUSETempDir: fuseTempDir}
c, err := proxy.NewClient(context.Background(), d, testLogger, conf)
if err != nil {
t.Fatalf("want error = nil, got = %v", err)
}

ready := make(chan struct{})
go c.Serve(context.Background(), func() { close(ready) })
servErrCh := make(chan error)

ctx, cancel := context.WithCancel(context.Background())
go func() {
servErr := c.Serve(ctx, func() { close(ready) })
select {
case servErrCh <- servErr:
case <-ctx.Done():
}
}()
select {
case <-ready:
case <-time.Tick(5 * time.Second):
t.Fatal("failed to Serve")
}
return c, func() {
return c, servErrCh, func() {
if cErr := c.Close(); cErr != nil {
t.Logf("failed to close client: %v", cErr)
}
cancel()
}
}

Expand All @@ -70,7 +80,7 @@ func TestFUSEREADME(t *testing.T) {
}
dir := randTmpDir(t)
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, dir, randTmpDir(t))
_, _, cleanup := newTestClient(t, d, dir, randTmpDir(t))

fi, err := os.Stat(dir)
if err != nil {
Expand Down Expand Up @@ -159,7 +169,7 @@ func TestFUSEDialInstance(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, fuseDir, tc.fuseTempDir)
_, _, cleanup := newTestClient(t, d, fuseDir, tc.fuseTempDir)
defer cleanup()

conn := tryDialUnix(t, tc.socketPath)
Expand All @@ -184,12 +194,65 @@ func TestFUSEDialInstance(t *testing.T) {
}
}

func TestFUSEAcceptErrorReturnedFromServe(t *testing.T) {
if testing.Short() {
t.Skip("skipping fuse tests in short mode.")
}

fuseDir := randTmpDir(t)
fuseTempDir := randTmpDir(t)
socketPath := postgresSocketPath(fuseDir, "proj.region.cluster.instance")

// Create a new client
d := &fakeDialer{}
c, servErrCh, cleanup := newTestClient(t, d, fuseDir, fuseTempDir)
defer cleanup()

// Attempt a successful connection to the client
conn := tryDialUnix(t, socketPath)
defer conn.Close()

// Ensure that the client actually fully connected.
// This solves a race condition in the test that is only present on
// the Ubuntu-Latest platform.
var got []string
for i := 0; i < 10; i++ {
got = d.dialedInstances()
if len(got) == 1 {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(got) != 1 {
t.Fatalf("dialed instances len: want = 1, got = %v", got)
}

// Explicitly close the dialer. This will close all the unix sockets, forcing
// the unix socket accept goroutine to exit with an error
c.Close()

// Check that Client.Serve() returned a non-nil error
for i := 0; i < 10; i++ {
select {
case servErr := <-servErrCh:
if servErr == nil {
t.Fatal("got nil, want non-nil error returned by Client.Serve()")
}
return
default:
time.Sleep(100 * time.Millisecond)
continue
}
}
t.Fatal("No error thrown by Client.Serve()")
}

func TestFUSEReadDir(t *testing.T) {
if testing.Short() {
t.Skip("skipping fuse tests in short mode.")
}
fuseDir := randTmpDir(t)
_, cleanup := newTestClient(t, &fakeDialer{}, fuseDir, randTmpDir(t))
_, _, cleanup := newTestClient(t, &fakeDialer{}, fuseDir, randTmpDir(t))
defer cleanup()

// Initiate a connection so the FUSE server will list it in the dir entries.
Expand Down Expand Up @@ -219,7 +282,7 @@ func TestFUSEWithBadInstanceName(t *testing.T) {
}
fuseDir := randTmpDir(t)
d := &fakeDialer{}
_, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
_, _, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
defer cleanup()

_, dialErr := net.Dial("unix", filepath.Join(fuseDir, "notvalid"))
Expand All @@ -238,7 +301,7 @@ func TestFUSECheckConnections(t *testing.T) {
}
fuseDir := randTmpDir(t)
d := &fakeDialer{}
c, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
c, _, cleanup := newTestClient(t, d, fuseDir, randTmpDir(t))
defer cleanup()

// first establish a connection to "register" it with the proxy
Expand Down Expand Up @@ -273,7 +336,7 @@ func TestFUSEClose(t *testing.T) {
}
fuseDir := randTmpDir(t)
d := &fakeDialer{}
c, _ := newTestClient(t, d, fuseDir, randTmpDir(t))
c, _, _ := newTestClient(t, d, fuseDir, randTmpDir(t))

// first establish a connection to "register" it with the proxy
conn := tryDialUnix(t, postgresSocketPath(fuseDir, "proj.region.cluster.instance"))
Expand Down Expand Up @@ -306,7 +369,7 @@ func TestLookupIgnoresContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
d := &fakeDialer{}
c, _ := newTestClient(t, d, randTmpDir(t), randTmpDir(t))
c, _, _ := newTestClient(t, d, randTmpDir(t), randTmpDir(t))

// invoke Lookup with cancelled context, should ignore context and succeed
_, err := c.Lookup(ctx, "proj.region.cluster.instance", nil)
Expand Down
22 changes: 19 additions & 3 deletions internal/proxy/proxy_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type fuseMount struct {
fuseServerMu *sync.Mutex
fuseServer *fuse.Server
fuseWg *sync.WaitGroup
fuseExitCh chan error

// Inode adds support for FUSE operations.
fs.Inode
Expand Down Expand Up @@ -131,8 +132,18 @@ func (c *Client) Lookup(_ context.Context, instance string, _ *fuse.EntryOut) (*
sErr := c.serveSocketMount(ctx, s)
if sErr != nil {
c.fuseMu.Lock()
defer c.fuseMu.Unlock()
delete(c.fuseSockets, instance)
c.fuseMu.Unlock()
select {
// Best effort attempt to send error.
// If this send fails, it means the reading goroutine has
// already pulled a value out of the channel and is no longer
// reading any more values. In other words, we report only the
// first error.
case c.fuseExitCh <- sErr:
default:
return
}
}
}()

Expand Down Expand Up @@ -163,10 +174,15 @@ func (c *Client) serveFuse(ctx context.Context, notify func()) error {
}
c.fuseServerMu.Lock()
c.fuseServer = srv
c.fuseExitCh = make(chan error)
c.fuseServerMu.Unlock()
notify()
<-ctx.Done()
return ctx.Err()
select {
case err = <-c.fuseExitCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (c *Client) fuseMounts() []*socketMount {
Expand Down
Loading