Skip to content

Commit a9e159b

Browse files
authored
fix race condition in httpcluster (#41)
* rework several things in the httpcluster to fix a race condition. This change also includes many adjustments to tests. * reduce bench time limit * adjust the benchmark test code
1 parent be893b3 commit a9e159b

File tree

12 files changed

+909
-346
lines changed

12 files changed

+909
-346
lines changed

Makefile

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
# Variables
2-
PACKAGES := $(shell go list ./...)
3-
41
.PHONY: all
52
all: help
63

@@ -16,12 +13,12 @@ help: Makefile
1613
## test: Run tests with race detection and coverage
1714
.PHONY: test
1815
test:
19-
go test -timeout 3m -race -cover $(PACKAGES)
16+
go test -timeout 3m -race -cover ./...
2017

2118
## bench: Run performance benchmarks
2219
.PHONY: bench
2320
bench:
24-
go test -run=^$$ -bench=. -benchmem $(PACKAGES)
21+
go test -timeout 2m -run=^$$ -bench=. -benchmem ./...
2522

2623
## lint: Run golangci-lint code quality checks
2724
.PHONY: lint

internal/networking/portfinder.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package networking
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"sync"
7+
"testing"
8+
)
9+
10+
// reduce the chance of port conflicts
11+
var (
12+
portMutex = &sync.Mutex{}
13+
usedPorts = make(map[int]struct{})
14+
)
15+
16+
// GetRandomPort finds an available port for a test by binding to port 0
17+
func GetRandomPort(tb testing.TB) int {
18+
tb.Helper()
19+
portMutex.Lock()
20+
defer portMutex.Unlock()
21+
listener, err := net.Listen("tcp", ":0")
22+
if err != nil {
23+
tb.Fatalf("Failed to get random port: %v", err)
24+
}
25+
26+
err = listener.Close()
27+
if err != nil {
28+
tb.Fatalf("Failed to close listener: %v", err)
29+
}
30+
31+
addr := listener.Addr().(*net.TCPAddr)
32+
p := addr.Port
33+
// Check if the port is already used
34+
if _, ok := usedPorts[p]; ok {
35+
return GetRandomPort(tb)
36+
}
37+
usedPorts[p] = struct{}{}
38+
return p
39+
}
40+
41+
// GetRandomListeningPort finds an available port for a test by binding to port 0, and returns a string like localhost:PORT
42+
func GetRandomListeningPort(tb testing.TB) string {
43+
tb.Helper()
44+
p := GetRandomPort(tb)
45+
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p))
46+
if err != nil {
47+
return GetRandomListeningPort(tb)
48+
}
49+
err = listener.Close()
50+
if err != nil {
51+
tb.Fatalf("Failed to close listener: %v", err)
52+
}
53+
54+
return fmt.Sprintf("localhost:%d", p)
55+
}

runnables/httpcluster/entries.go

Lines changed: 73 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package httpcluster
33
import (
44
"context"
55
"iter"
6-
"log/slog"
76
"maps"
87

98
"github.com/robbyt/go-supervisor/runnables/httpserver"
@@ -41,42 +40,42 @@ type entries struct {
4140
servers map[string]*serverEntry
4241
}
4342

44-
// newEntries creates a new entries collection from the previous state and desired configuration.
45-
// Each entry is marked with the action needed during the commit phase.
46-
func newEntries(
47-
previous *entries,
48-
desiredConfigs map[string]*httpserver.Config,
49-
logger *slog.Logger,
50-
) *entries {
51-
logger = logger.WithGroup("newEntries")
43+
// newEntries creates a new entries collection from desired configuration.
44+
// This is used for initial creation only.
45+
func newEntries(desiredConfigs map[string]*httpserver.Config) *entries {
5246
servers := make(map[string]*serverEntry)
5347

54-
// Process existing servers (mark for stop, or update)
55-
if previous != nil {
56-
for id, oldEntry := range previous.servers {
57-
maps.Insert(servers, processExistingServer(id, oldEntry, desiredConfigs[id], logger))
58-
}
59-
}
60-
61-
// Process new servers
6248
for id, config := range desiredConfigs {
6349
if config == nil {
6450
continue
6551
}
66-
if previous == nil || previous.servers[id] == nil {
67-
// New server
68-
servers[id] = &serverEntry{
69-
id: id,
70-
config: config,
71-
action: actionStart,
72-
}
73-
logger.Debug("New server marked for start", "id", id)
52+
servers[id] = &serverEntry{
53+
id: id,
54+
config: config,
55+
action: actionStart,
7456
}
7557
}
7658

7759
return &entries{servers: servers}
7860
}
7961

62+
// removeEntry creates a new entries collection with the specified entry completely removed.
63+
func (e *entries) removeEntry(id string) entriesManager {
64+
_, exists := e.servers[id]
65+
if !exists {
66+
return e
67+
}
68+
69+
newServers := make(map[string]*serverEntry, len(e.servers)-1)
70+
for k, v := range e.servers {
71+
if k != id {
72+
newServers[k] = v
73+
}
74+
}
75+
76+
return &entries{servers: newServers}
77+
}
78+
8079
// getPendingActions returns lists of server IDs grouped by their pending action.
8180
func (e *entries) getPendingActions() (toStart, toStop []string) {
8281
for id, entry := range e.servers {
@@ -118,20 +117,18 @@ func (e *entries) commit() entriesManager {
118117
servers := make(map[string]*serverEntry)
119118

120119
for id, entry := range e.servers {
121-
switch entry.action {
122-
case actionStop:
120+
if entry.action == actionStop {
123121
// Don't copy stopped servers
124122
continue
125-
default:
126-
// Copy entry with action cleared
127-
servers[id] = &serverEntry{
128-
id: entry.id,
129-
config: entry.config,
130-
runner: entry.runner,
131-
ctx: entry.ctx,
132-
cancel: entry.cancel,
133-
action: actionNone,
134-
}
123+
}
124+
// Copy entry with action cleared
125+
servers[id] = &serverEntry{
126+
id: entry.id,
127+
config: entry.config,
128+
runner: entry.runner,
129+
ctx: entry.ctx,
130+
cancel: entry.cancel,
131+
action: actionNone,
135132
}
136133
}
137134

@@ -209,11 +206,9 @@ func processExistingServer(
209206
id string,
210207
oldEntry *serverEntry,
211208
desiredConfig *httpserver.Config,
212-
logger *slog.Logger,
213209
) iter.Seq2[string, *serverEntry] {
214210
return func(yield func(string, *serverEntry) bool) {
215211
if oldEntry == nil {
216-
logger.Warn("Old entry object is nil", "id", id)
217212
return
218213
}
219214

@@ -222,7 +217,6 @@ func processExistingServer(
222217
if oldEntry.runner != nil {
223218
newEntry := *oldEntry
224219
newEntry.action = actionStop
225-
logger.Debug("Server marked for stop", "id", id)
226220
yield(id, &newEntry)
227221
}
228222
// If runner is nil, server was never started, so skip it
@@ -233,7 +227,6 @@ func processExistingServer(
233227
if oldEntry.config.Equal(desiredConfig) {
234228
newEntry := *oldEntry
235229
newEntry.action = actionNone
236-
logger.Debug("Server unchanged", "id", id)
237230
yield(id, &newEntry)
238231
return
239232
}
@@ -254,17 +247,54 @@ func processExistingServer(
254247
action: actionStart,
255248
}
256249

257-
logger.Debug("Server marked for restart", "id", id)
258250
yield(id, startEntry)
259251
return
260252
}
261253

262254
// Not running, just start with new config
263-
logger.Debug("Server marked for start", "id", id)
264255
yield(id, &serverEntry{
265256
id: id,
266257
config: desiredConfig,
267258
action: actionStart,
268259
})
269260
}
270261
}
262+
263+
// buildPendingEntries creates a new entries collection based on the desired state.
264+
// It uses the current entries as the previous state and applies the same logic as newEntries.
265+
func (e *entries) buildPendingEntries(desired entriesManager) entriesManager {
266+
// Extract configs from the desired entries
267+
desiredEntries, ok := desired.(*entries)
268+
if !ok {
269+
return e
270+
}
271+
272+
desiredConfigs := make(map[string]*httpserver.Config)
273+
for id, entry := range desiredEntries.servers {
274+
desiredConfigs[id] = entry.config
275+
}
276+
277+
servers := make(map[string]*serverEntry)
278+
279+
// Process existing servers (mark for stop, or update)
280+
for id, oldEntry := range e.servers {
281+
maps.Insert(servers, processExistingServer(id, oldEntry, desiredConfigs[id]))
282+
}
283+
284+
// Process new servers
285+
for id, config := range desiredConfigs {
286+
if config == nil {
287+
continue
288+
}
289+
if e.servers[id] == nil {
290+
// New server
291+
servers[id] = &serverEntry{
292+
id: id,
293+
config: config,
294+
action: actionStart,
295+
}
296+
}
297+
}
298+
299+
return &entries{servers: servers}
300+
}

0 commit comments

Comments
 (0)