Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions tavern/internal/portals/mux/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package mux

import (
"context"
"crypto/rand"
"testing"

"github.com/stretchr/testify/require"
"realm.pub/tavern/internal/c2/c2pb"
"realm.pub/tavern/internal/ent/enttest"
"realm.pub/tavern/internal/ent/tome"
"realm.pub/tavern/portals/portalpb"
)

func BenchmarkMuxThroughput(b *testing.B) {
// Setup DB
client := enttest.Open(b, "sqlite3", "file:ent?mode=memory&cache=shared&_fk=1")
defer client.Close()

// Setup Mux
m := New(WithInMemoryDriver(), WithSubscriberBufferSize(1000))
ctx := context.Background()

// Setup Entities
u := client.User.Create().SetName("benchuser").SetOauthID("oauth").SetPhotoURL("photo").SaveX(ctx)
h := client.Host.Create().SetName("benchhost").SetIdentifier("ident").SetPlatform(c2pb.Host_PLATFORM_LINUX).SaveX(ctx)
beacon := client.Beacon.Create().SetName("benchbeacon").SetTransport(c2pb.Beacon_TRANSPORT_HTTP1).SetHost(h).SaveX(ctx)

tomeEnt := client.Tome.Create().SetName("benchtome").SetDescription("desc").SetAuthor(u.Name).SetUploader(u).SetTactic(tome.TacticRECON).SetEldritch("nop").SaveX(ctx)
quest := client.Quest.Create().SetName("benchquest").SetParameters("").SetCreator(u).SetTome(tomeEnt).SaveX(ctx)
task := client.Task.Create().SetQuest(quest).SetBeacon(beacon).SaveX(ctx)

// Setup Portals
// Host Side
portalID, teardownCreate, err := m.CreatePortal(ctx, client, task.ID)
require.NoError(b, err)
defer teardownCreate()

hostCh, cancelHostSub := m.Subscribe(m.TopicIn(portalID), WithHistoryReplay(false))
defer cancelHostSub()

// Client Side
teardownOpen, err := m.OpenPortal(ctx, portalID)
require.NoError(b, err)
defer teardownOpen()

clientCh, cancelClientSub := m.Subscribe(m.TopicOut(portalID), WithHistoryReplay(false))
defer cancelClientSub()

// Payload (64KB)
payloadSize := 64 * 1024
payloadData := make([]byte, payloadSize)
_, err = rand.Read(payloadData)
require.NoError(b, err)

mote := &portalpb.Mote{
Payload: &portalpb.Mote_Bytes{
Bytes: &portalpb.BytesPayload{
Data: payloadData,
Kind: portalpb.BytesPayloadKind_BYTES_PAYLOAD_KIND_DATA,
},
},
}

b.SetBytes(int64(payloadSize * 2)) // Bidirectional throughput
b.ResetTimer()

for i := 0; i < b.N; i++ {
// 1. Client sends to Host (TopicIn)
if err := m.Publish(ctx, m.TopicIn(portalID), mote); err != nil {
b.Fatalf("Failed to publish to TopicIn: %v", err)
}

// Host receives
select {
case <-hostCh:
// Success
case <-ctx.Done():
b.Fatal("Context cancelled")
}

// 2. Host sends to Client (TopicOut)
if err := m.Publish(ctx, m.TopicOut(portalID), mote); err != nil {
b.Fatalf("Failed to publish to TopicOut: %v", err)
}

// Client receives
select {
case <-clientCh:
// Success
case <-ctx.Done():
b.Fatal("Context cancelled")
}
}
}
54 changes: 54 additions & 0 deletions tavern/internal/portals/mux/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mux

import (
"sync"

"realm.pub/tavern/portals/portalpb"
)

// HistoryBuffer is a circular buffer for storing recent messages.
type HistoryBuffer struct {
messages []*portalpb.Mote
capacity int
head int
mutex sync.RWMutex
}

// NewHistoryBuffer creates a new history buffer with the given capacity.
func NewHistoryBuffer(capacity int) *HistoryBuffer {
if capacity <= 0 {
capacity = 100 // Default
}
return &HistoryBuffer{
messages: make([]*portalpb.Mote, 0, capacity),
capacity: capacity,
}
}

// Add adds a message to the buffer.
func (h *HistoryBuffer) Add(msg *portalpb.Mote) {
h.mutex.Lock()
defer h.mutex.Unlock()

if len(h.messages) < h.capacity {
h.messages = append(h.messages, msg)
} else {
h.messages[h.head] = msg
h.head = (h.head + 1) % h.capacity
}
}

// Get returns all messages in the buffer in order.
func (h *HistoryBuffer) Get() []*portalpb.Mote {
h.mutex.RLock()
defer h.mutex.RUnlock()

result := make([]*portalpb.Mote, 0, len(h.messages))
if len(h.messages) < h.capacity {
result = append(result, h.messages...)
} else {
result = append(result, h.messages[h.head:]...)
result = append(result, h.messages[:h.head]...)
}
return result
}
Loading