Skip to content

Commit

Permalink
types: add AnyDisconnected() method on ConnectivityGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Sep 27, 2024
1 parent c07661a commit fb35a2b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
28 changes: 25 additions & 3 deletions pkg/types/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,36 @@ func (g *ConnectivityGroup) Add(con *Connectivity) {
g.connections = append(g.connections, con)
}

func (g *ConnectivityGroup) AnyDisconnected(ctx context.Context) bool {
g.mu.Lock()
conns := g.connections
g.mu.Unlock()

for _, conn := range conns {
select {
case <-ctx.Done():
return false

case <-conn.connectedC:
continue

case <-conn.disconnectedC:
return true
}
}

return false
}

func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}, allTimeoutDuration time.Duration) {
g.mu.Lock()
defer g.mu.Unlock()
conns := g.connections
g.mu.Unlock()

authedConns := make([]bool, len(g.connections))
authedConns := make([]bool, len(conns))
allTimeout := time.After(allTimeoutDuration)
for {
for idx, con := range g.connections {
for idx, con := range conns {
// if the connection is not authed, mark it as false
if !con.authed {
// authedConns[idx] = false
Expand Down
9 changes: 7 additions & 2 deletions pkg/types/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,21 @@ func TestConnectivity(t *testing.T) {

func TestConnectivityGroupAuthC(t *testing.T) {
timeout := 100 * time.Millisecond
delay := timeout * 2

ctx := context.Background()
conn1 := NewConnectivity()
conn2 := NewConnectivity()
group := NewConnectivityGroup(conn1, conn2)
allAuthedC := group.AllAuthedC(ctx, time.Second)

time.Sleep(timeout)
time.Sleep(delay)
conn1.handleConnect()
assert.True(t, waitSigChan(conn1.ConnectedC(), timeout))
conn1.handleAuth()
assert.True(t, waitSigChan(conn1.AuthedC(), timeout))

time.Sleep(timeout)
time.Sleep(delay)
conn2.handleConnect()
assert.True(t, waitSigChan(conn2.ConnectedC(), timeout))

Expand Down Expand Up @@ -82,10 +83,14 @@ func TestConnectivityGroupReconnect(t *testing.T) {

assert.True(t, waitSigChan(group.AllAuthedC(ctx, time.Second), timeout), "all connections are authenticated")

assert.False(t, group.AnyDisconnected(ctx))

// this should re-allocate authedC
conn1.handleDisconnect()
assert.NotEqual(t, conn1authC, conn1.authedC)

assert.True(t, group.AnyDisconnected(ctx))

assert.False(t, waitSigChan(group.AllAuthedC(ctx, time.Second), timeout), "one connection should be un-authed")

time.Sleep(delay)
Expand Down

0 comments on commit fb35a2b

Please sign in to comment.