Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate initial connection race condition in win32PipeListener (addresses #83) #84

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,24 @@ func makeWin32File(h syscall.Handle) (*win32File, error) {
return f, nil
}

// If a handle previously associated with a successfully constructed win32File
// needs to be recycled, use _this_ constructor. It doesn't re-add the handle
// to the ioCompletionPort.

func reuseWin32File(h syscall.Handle) *win32File {
f := &win32File{handle: h}
f.readDeadline.channel = make(timeoutChan)
f.writeDeadline.channel = make(timeoutChan)
return f
}

func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) {
return makeWin32File(h)
}

// closeHandle closes the resources associated with a Win32 handle
func (f *win32File) closeHandle() {
func (f *win32File) nilHandleReturning() syscall.Handle {
var ret syscall.Handle
f.wgLock.Lock()
// Atomically set that we are closing, releasing the resources only once.
if !f.closing.swap(true) {
Expand All @@ -122,16 +134,20 @@ func (f *win32File) closeHandle() {
cancelIoEx(f.handle, nil)
f.wg.Wait()
// at this point, no new IO can start
syscall.Close(f.handle)
ret = f.handle
f.handle = 0
} else {
f.wgLock.Unlock()
}
return ret
}

// Close closes a win32File.
func (f *win32File) Close() error {
f.closeHandle()
handle := f.nilHandleReturning()
if handle != 0 {
syscall.Close(handle)
}
return nil
}

Expand Down
151 changes: 120 additions & 31 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"io"
"net"
"os"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
)

//sys connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) = ConnectNamedPipe
//sys disconnectNamedPipe(pipe syscall.Handle) (err error) = DisconnectNamedPipe
//sys createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateNamedPipeW
//sys createFile(name string, access uint32, mode uint32, sa *syscall.SecurityAttributes, createmode uint32, attrs uint32, templatefile syscall.Handle) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateFileW
//sys waitNamedPipe(name string, timeout uint32) (err error) = WaitNamedPipeW
Expand Down Expand Up @@ -54,6 +57,12 @@ var (
type win32Pipe struct {
*win32File
path string
// If this instance of a pipe was created by a listener, the Close()
// method may attempt to return its instance to the listener to be
// re-used iff the listener does not already have another instance
// prepared for connection, usually as the consequence of an error
// returned by createNamedPipe.
listener *win32PipeListener
}

type win32MessageBytePipe struct {
Expand All @@ -78,6 +87,44 @@ func (f *win32Pipe) SetDeadline(t time.Time) error {
return nil
}

// This somewhat overrides the win32File implementation, because we're
// sometimes responsible for keeping at least one pipe instance open so
// this process can retain its claim on the pipe name.

func (f *win32Pipe) Close() error {
// Not all instances of win32Pipe have a listener. In particular,
// instances created with DialPipe definitely don't have a listener.
if f.listener == nil {
return f.win32File.Close()
}
f.listener.nextLock.Lock()
var (
listenerOpen bool
nextPipe *win32File
)
select {
case <- f.listener.doneCh:
// pass, default for bool is false
default:
listenerOpen = true
}
nextPipe = (*win32File)(atomic.LoadPointer(&f.listener.nextPipe))
// If the nextPipe is not nil, this means the listenerRoutine managed
// to successfully fill it. We don't need to touch it in this case.
if nextPipe != nil || !listenerOpen {
f.listener.nextLock.Unlock()
return f.win32File.Close()
}
handle := f.win32File.nilHandleReturning()
disconnectNamedPipe(handle)
// Simply reconnecting the pipe will keep the instance open, meaning
// we keep our name in the pipe namespace.
nextPipe = reuseWin32File(handle)
atomic.StorePointer(&f.listener.nextPipe, unsafe.Pointer(nextPipe))
f.listener.nextLock.Unlock()
return nil
}

// CloseWrite closes the write side of a message pipe in byte mode.
func (f *win32MessageBytePipe) CloseWrite() error {
if f.writeClosed {
Expand Down Expand Up @@ -202,13 +249,16 @@ type acceptResponse struct {
}

type win32PipeListener struct {
firstHandle syscall.Handle
// this is actually a *win32File, but because of the use of atomic
// we have to keep this as an unsafe.Pointer
nextPipe unsafe.Pointer
path string
securityDescriptor []byte
config PipeConfig
acceptCh chan (chan acceptResponse)
closeCh chan int
doneCh chan int
nextLock sync.Mutex
}

func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
Expand Down Expand Up @@ -237,8 +287,8 @@ func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig,
return h, nil
}

func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false)
func makeServerPipeFirst(path string, securityDescriptor []byte, c *PipeConfig) (*win32File, error) {
h, err := makeServerPipeHandle(path, securityDescriptor, c, true)
if err != nil {
return nil, err
}
Expand All @@ -250,63 +300,106 @@ func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
return f, nil
}

func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
p, err := l.makeServerPipe()
func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false)
if err != nil {
return nil, err
}
f, err := makeWin32File(h)
if err != nil {
syscall.Close(h)
return nil, err
}
return f, nil
}

func (l *win32PipeListener) connectServerPipe(pipe *win32File) error {
var err error

// Wait for the client to connect.
ch := make(chan error)
go func(p *win32File) {
ch <- connectPipe(p)
}(p)

}(pipe)
select {
case err = <-ch:
if err != nil {
p.Close()
p = nil
disconnectNamedPipe(pipe.handle)
}
case <-l.closeCh:
// Abort the connect request by closing the handle.
p.Close()
p = nil
pipe.Close()
// Note that we aren't nil-ing out l.nextPipe, it's
// harmless to .Close() on the file more than once.
err = <-ch
if err == nil || err == ErrFileClosed {
if err == nil || err == ErrFileClosed || pipeWasConnected(err) {
err = ErrPipeListenerClosed
}
}
return p, err
return err
}

func pipeWasConnected(err error) bool {
return err == cERROR_NO_DATA || err == cERROR_PIPE_CONNECTED
}

func (l *win32PipeListener) listenerRoutine() {
closed := false
var nextErr error
for !closed {
select {
case <-l.closeCh:
closed = true
case responseCh := <-l.acceptCh:
var (
p *win32File
err error
nextPipe *win32File
err error
)

nextPipe = (*win32File)(atomic.LoadPointer(&l.nextPipe))

if nextPipe == nil {
responseCh <- acceptResponse{nil, nextErr}

l.nextLock.Lock()
nextPipe = (*win32File)(atomic.LoadPointer(&l.nextPipe))
if nextPipe == nil {
nextPipe, nextErr = l.makeServerPipe()
atomic.StorePointer(&l.nextPipe, unsafe.Pointer(nextPipe))
// l.nextPipe, nextErr = l.makeServerPipe()
}
l.nextLock.Unlock()
continue
}
for {
p, err = l.makeConnectedServerPipe()
err = l.connectServerPipe(nextPipe)
// If the connection was immediately closed by the client, try
// again.
if err != cERROR_NO_DATA {
break
}
}
responseCh <- acceptResponse{p, err}
closed = err == ErrPipeListenerClosed
p := nextPipe
if !closed {
l.nextLock.Lock()
nextPipe, nextErr = l.makeServerPipe()
atomic.StorePointer(&l.nextPipe, unsafe.Pointer(nextPipe))
l.nextLock.Unlock()
}
responseCh <- acceptResponse{p, err}
}
}
syscall.Close(l.firstHandle)
l.firstHandle = 0
// Notify Close() and Accept() callers that the handle has been closed.
close(l.doneCh)
l.nextLock.Lock()
defer l.nextLock.Unlock()
if l.nextPipe != nil {
nextPipe := (*win32File)(atomic.LoadPointer(&l.nextPipe))
nextPipe.Close()
atomic.StorePointer(&l.nextPipe, nil)
}
}

// PipeConfig contain configuration for the pipe listener.
Expand Down Expand Up @@ -345,20 +438,12 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
return nil, err
}
}
h, err := makeServerPipeHandle(path, sd, c, true)
p, err := makeServerPipeFirst(path, sd, c)
if err != nil {
return nil, err
}
// Immediately open and then close a client handle so that the named pipe is
// created but not currently accepting connections.
h2, err := createFile(path, 0, 0, nil, syscall.OPEN_EXISTING, cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
if err != nil {
syscall.Close(h)
return nil, err
}
syscall.Close(h2)
l := &win32PipeListener{
firstHandle: h,
nextPipe: unsafe.Pointer(p),
path: path,
securityDescriptor: sd,
config: *c,
Expand Down Expand Up @@ -396,10 +481,14 @@ func (l *win32PipeListener) Accept() (net.Conn, error) {
}
if l.config.MessageMode {
return &win32MessageBytePipe{
win32Pipe: win32Pipe{win32File: response.f, path: l.path},
win32Pipe: win32Pipe{
win32File: response.f,
path: l.path,
listener: l,
},
}, nil
}
return &win32Pipe{win32File: response.f, path: l.path}, nil
return &win32Pipe{win32File: response.f, path: l.path, listener: l}, nil
case <-l.doneCh:
return nil, ErrPipeListenerClosed
}
Expand Down
24 changes: 24 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func TestDialListenerTimesOut(t *testing.T) {
t.Fatal(err)
}
defer l.Close()
// listener.Listen() always keeps a pipe instance open. Whether
// anyone can connect to it is a matter of whether someone has
// already connected to it, or if so, if the server has Accept()ed
// the connection and has opened a new pipe instance for future
// connections. So, in order to properly test timeouts, we need
// to establish a "blocker" connection that is not Accept()ed,
// blocking future connection attempts in the process.
blocker, err := DialPipe(testPipeName, nil)
if err != nil {
t.Fatal(err)
}
defer blocker.Close()
var d = time.Duration(10 * time.Millisecond)
_, err = DialPipe(testPipeName, &d)
if err != ErrTimeout {
Expand Down Expand Up @@ -260,6 +272,18 @@ func TestDialTimesOutByDefault(t *testing.T) {
t.Fatal(err)
}
defer l.Close()
// listener.Listen() always keeps a pipe instance open. Whether
// anyone can connect to it is a matter of whether someone has
// already connected to it, or if so, if the server has Accept()ed
// the connection and has opened a new pipe instance for future
// connections. So, in order to properly test timeouts, we need
// to establish a "blocker" connection that is not Accept()ed,
// blocking future connection attempts in the process.
blocker, err := DialPipe(testPipeName, nil)
if err != nil {
t.Fatal(err)
}
defer blocker.Close()
_, err = DialPipe(testPipeName, nil)
if err != ErrTimeout {
t.Fatalf("expected ErrTimeout, got %v", err)
Expand Down
13 changes: 13 additions & 0 deletions zsyscall_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
procGetQueuedCompletionStatus = modkernel32.NewProc("GetQueuedCompletionStatus")
procSetFileCompletionNotificationModes = modkernel32.NewProc("SetFileCompletionNotificationModes")
procConnectNamedPipe = modkernel32.NewProc("ConnectNamedPipe")
procDisconnectNamedPipe = modkernel32.NewProc("DisconnectNamedPipe")
procCreateNamedPipeW = modkernel32.NewProc("CreateNamedPipeW")
procCreateFileW = modkernel32.NewProc("CreateFileW")
procWaitNamedPipeW = modkernel32.NewProc("WaitNamedPipeW")
Expand Down Expand Up @@ -132,6 +133,18 @@ func connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) {
return
}

func disconnectNamedPipe(pipe syscall.Handle) (err error) {
r1, _, e1 := syscall.Syscall(procDisconnectNamedPipe.Addr(), 1, uintptr(pipe), 0, 0)
if r1 == 0 {
if e1 != 0 {
err = errnoErr(e1)
} else {
err = syscall.EINVAL
}
}
return
}

func createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) {
var _p0 *uint16
_p0, err = syscall.UTF16PtrFromString(name)
Expand Down