Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions lazyClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multistream
import (
"fmt"
"io"
"sync"
)

// NewMSSelect returns a new Multistream which is able to perform
Expand All @@ -12,6 +11,9 @@ func NewMSSelect[T StringLike](c io.ReadWriteCloser, proto T) LazyConn {
return &lazyClientConn[T]{
protos: []T{ProtocolID, proto},
con: c,

rhandshakeOnce: newOnce(),
whandshakeOnce: newOnce(),
}
}

Expand All @@ -22,7 +24,38 @@ func NewMultistream[T StringLike](c io.ReadWriteCloser, proto T) LazyConn {
return &lazyClientConn[T]{
protos: []T{proto},
con: c,

rhandshakeOnce: newOnce(),
whandshakeOnce: newOnce(),
}
}

// once is a sync.Once that can be used by synctest.
// For Multistream, it is a bit better than sync.Once because it doesn't
// spin when acquiring the lock.
type once struct {
sem chan struct{}
}

func newOnce() *once {
o := once{
sem: make(chan struct{}, 1),
}
o.sem <- struct{}{}
return &o
}

func (o *once) Do(f func()) {
// We only ever pull a single value from the channel. But we want to block
// Do until the first call to Do has completed. The first call will close
// the channel, so by checking if it's closed we know we don't need to do
// anything.
_, ok := <-o.sem
if !ok {
return
}
defer close(o.sem)
f()
}

// lazyClientConn is a ReadWriteCloser adapter that lazily negotiates a protocol
Expand All @@ -33,11 +66,11 @@ func NewMultistream[T StringLike](c io.ReadWriteCloser, proto T) LazyConn {
// See: https://github.com/multiformats/go-multistream/issues/20
type lazyClientConn[T StringLike] struct {
// Used to ensure we only trigger the write half of the handshake once.
rhandshakeOnce sync.Once
rhandshakeOnce *once
rerr error

// Used to ensure we only trigger the read half of the handshake once.
whandshakeOnce sync.Once
whandshakeOnce *once
werr error

// The sequence of protocols to negotiate.
Expand Down
25 changes: 25 additions & 0 deletions multistream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -938,3 +939,27 @@ func TestComparableErrors(t *testing.T) {
t.Fatalf("Should be read as ErrNotSupported")
}
}

func TestOnceFunc(t *testing.T) {
o := newOnce()
start := make(chan struct{})
var runCount int
var wg sync.WaitGroup
const workers = 3
wg.Add(workers)
for range workers {
go func() {
defer wg.Done()
<-start
o.Do(func() { runCount++ })
if runCount != 1 {
t.Errorf("Do returned before func was run")
}
}()
}
close(start)
wg.Wait()
if runCount != 1 {
t.Fatalf("should have run only once")
}
}