Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions tavern/internal/http/stream/gcp_coldstart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream_test

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -16,18 +17,19 @@ func TestPreventPubSubColdStarts_ValidInterval(t *testing.T) {
defer cancel()

// Create a mock topic and subscription.
topic, err := pubsub.OpenTopic(ctx, "mem://valid")
topicName := fmt.Sprintf("mem://valid-%d", time.Now().UnixNano())
topic, err := pubsub.OpenTopic(ctx, topicName)
if err != nil {
t.Fatalf("Failed to open topic: %v", err)
}
defer topic.Shutdown(ctx)
sub, err := pubsub.OpenSubscription(ctx, "mem://valid")
sub, err := pubsub.OpenSubscription(ctx, topicName)
if err != nil {
t.Fatalf("Failed to open subscription: %v", err)
}
defer sub.Shutdown(ctx)

go stream.PreventPubSubColdStarts(ctx, 50*time.Millisecond, "mem://valid", "mem://valid")
go stream.PreventPubSubColdStarts(ctx, 50*time.Millisecond, topicName, topicName)

// Expect to receive a message
msg, err := sub.Receive(ctx)
Expand All @@ -43,18 +45,19 @@ func TestPreventPubSubColdStarts_ZeroInterval(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

topic, err := pubsub.OpenTopic(ctx, "mem://zero")
topicName := fmt.Sprintf("mem://zero-%d", time.Now().UnixNano())
topic, err := pubsub.OpenTopic(ctx, topicName)
if err != nil {
t.Fatalf("Failed to open topic: %v", err)
}
defer topic.Shutdown(ctx)
sub, err := pubsub.OpenSubscription(ctx, "mem://zero")
sub, err := pubsub.OpenSubscription(ctx, topicName)
if err != nil {
t.Fatalf("Failed to open subscription: %v", err)
}
defer sub.Shutdown(ctx)

go stream.PreventPubSubColdStarts(ctx, 0, "mem://zero", "mem://zero")
go stream.PreventPubSubColdStarts(ctx, 0, topicName, topicName)

// Expect to not receive a message and for the context to timeout
_, err = sub.Receive(ctx)
Expand All @@ -66,18 +69,19 @@ func TestPreventPubSubColdStarts_SubMillisecondInterval(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

topic, err := pubsub.OpenTopic(ctx, "mem://sub")
topicName := fmt.Sprintf("mem://sub-%d", time.Now().UnixNano())
topic, err := pubsub.OpenTopic(ctx, topicName)
if err != nil {
t.Fatalf("Failed to open topic: %v", err)
}
defer topic.Shutdown(ctx)
sub, err := pubsub.OpenSubscription(ctx, "mem://sub")
sub, err := pubsub.OpenSubscription(ctx, topicName)
if err != nil {
t.Fatalf("Failed to open subscription: %v", err)
}
defer sub.Shutdown(ctx)

go stream.PreventPubSubColdStarts(ctx, 1*time.Microsecond, "mem://sub", "mem://sub")
go stream.PreventPubSubColdStarts(ctx, 1*time.Microsecond, topicName, topicName)

// Expect to receive a message
msg, err := sub.Receive(ctx)
Expand Down
9 changes: 4 additions & 5 deletions tavern/internal/http/stream/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream_test

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -17,10 +18,11 @@ func TestMux(t *testing.T) {
defer cancel()

// Setup Topic and Subscription
topic, err := pubsub.OpenTopic(ctx, "mem://mux-test")
topicName := fmt.Sprintf("mem://mux-test-%d", time.Now().UnixNano())
topic, err := pubsub.OpenTopic(ctx, topicName)
require.NoError(t, err)
defer topic.Shutdown(ctx)
sub, err := pubsub.OpenSubscription(ctx, "mem://mux-test")
sub, err := pubsub.OpenSubscription(ctx, topicName)
require.NoError(t, err)
defer sub.Shutdown(ctx)

Expand All @@ -37,9 +39,6 @@ func TestMux(t *testing.T) {
mux.Register(stream2)
defer mux.Unregister(stream2)

// Give the mux a moment to register the streams
time.Sleep(50 * time.Millisecond)

// Send a message for stream1
err = topic.Send(ctx, &pubsub.Message{
Body: []byte("hello stream 1"),
Expand Down
5 changes: 3 additions & 2 deletions tavern/internal/http/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ func TestStream_SendMessage(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

topic, err := pubsub.OpenTopic(ctx, "mem://stream-test-send")
topicName := fmt.Sprintf("mem://stream-test-send-%d", time.Now().UnixNano())
topic, err := pubsub.OpenTopic(ctx, topicName)
require.NoError(t, err)
defer topic.Shutdown(ctx)
sub, err := pubsub.OpenSubscription(ctx, "mem://stream-test-send")
sub, err := pubsub.OpenSubscription(ctx, topicName)
require.NoError(t, err)
defer sub.Shutdown(ctx)

Expand Down
11 changes: 7 additions & 4 deletions tavern/internal/http/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream_test

import (
"context"
"fmt"
"net/http/httptest"
"strconv"
"strings"
Expand Down Expand Up @@ -29,18 +30,20 @@ func TestNewShellHandler(t *testing.T) {
defer cancel()

// Topic for messages going TO the websocket (server -> shell)
outputTopic, err := pubsub.OpenTopic(ctx, "mem://websocket-output")
outputTopicName := fmt.Sprintf("mem://websocket-output-%d", time.Now().UnixNano())
outputTopic, err := pubsub.OpenTopic(ctx, outputTopicName)
require.NoError(t, err)
defer outputTopic.Shutdown(ctx)
outputSub, err := pubsub.OpenSubscription(ctx, "mem://websocket-output")
outputSub, err := pubsub.OpenSubscription(ctx, outputTopicName)
require.NoError(t, err)
defer outputSub.Shutdown(ctx)

// Topic for messages coming FROM the websocket (shell -> server)
inputTopic, err := pubsub.OpenTopic(ctx, "mem://websocket-input")
inputTopicName := fmt.Sprintf("mem://websocket-input-%d", time.Now().UnixNano())
inputTopic, err := pubsub.OpenTopic(ctx, inputTopicName)
require.NoError(t, err)
defer inputTopic.Shutdown(ctx)
inputSub, err := pubsub.OpenSubscription(ctx, "mem://websocket-input")
inputSub, err := pubsub.OpenSubscription(ctx, inputTopicName)
require.NoError(t, err)
defer inputSub.Shutdown(ctx)

Expand Down
18 changes: 16 additions & 2 deletions tavern/internal/redirectors/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func TestRedirector_ContextCancellation(t *testing.T) {
serverErr <- redirector.Redirect(ctx, addr, upstreamConn)
}()

// Wait a moment for the server to start listening.
time.Sleep(100 * time.Millisecond)
// Wait for the server to start listening.
waitForServer(t, addr)

// Cancel the context, which should trigger GracefulStop.
cancel()
Expand Down Expand Up @@ -215,3 +215,17 @@ func TestRedirector_UpstreamFailure(t *testing.T) {
require.True(t, ok, "error should be a gRPC status error")
require.Equal(t, codes.Unavailable, s.Code(), "error code should be Unavailable")
}

func waitForServer(t *testing.T, addr string) {
t.Helper()
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
if err == nil {
conn.Close()
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("server did not start listening on %s", addr)
}
Loading