Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/cmd/api/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ type ApiService struct {
monitorMu sync.Mutex
lifecycleCtx context.Context
lifecycleCancel context.CancelFunc

// Reader for durable S2 telemetry storage. Nil when S2 is not configured.
telemetryReader *events.S2Reader
}

// s2Enabled reports whether durable S2 telemetry storage is configured.
func (s *ApiService) s2Enabled() bool {
return s.telemetryReader != nil
}

var _ oapi.StrictServerInterface = (*ApiService)(nil)
Expand All @@ -105,6 +113,7 @@ func New(
telemetrySession *telemetry.TelemetrySession,
eventStream *events.EventStream,
displayNum int,
telemetryReader *events.S2Reader,
) (*ApiService, error) {
switch {
case recordManager == nil:
Expand Down Expand Up @@ -140,6 +149,7 @@ func New(
cdpMonitor: mon,
lifecycleCtx: ctx,
lifecycleCancel: cancel,
telemetryReader: telemetryReader,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func newTelemetrySession(t *testing.T) (*telemetry.TelemetrySession, *events.Eve
func newSvc(t *testing.T, mgr recorder.RecordManager) (*ApiService, error) {
t.Helper()
ts, es := newTelemetrySession(t)
return New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0)
return New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0, nil)
}

func TestApiService_PatchChromiumFlags(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/display_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact
func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService {
t.Helper()
ts, es := newTelemetrySession(t)
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0)
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0, nil)
require.NoError(t, err)
return svc
}
Expand Down
118 changes: 118 additions & 0 deletions server/cmd/api/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/kernel/kernel-images/server/lib/events"
"github.com/kernel/kernel-images/server/lib/logger"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
)

Expand Down Expand Up @@ -136,6 +137,123 @@ func (s *ApiService) StreamTelemetryEvents(ctx context.Context, req oapi.StreamT
return oapi.StreamTelemetryEvents200TexteventStreamResponse{Body: pr, Headers: headers}, nil
}

const (
// defaultReadWindow bounds a read that supplies no since/until.
defaultReadWindow = 5 * time.Minute
// defaultPageSize / maxPageSize bound how many records one page reads.
defaultPageSize = 100
maxPageSize = 1000
)

// ReadTelemetryEvents handles GET /telemetry/events.
// Reads one page of archived telemetry envelopes for this browser from durable
// S2 storage in ascending sequence order, applying the category filter. The
// X-Has-More / X-Next-Offset response headers carry the pagination cursor.
// Returns an empty list when S2 storage is not configured.
func (s *ApiService) ReadTelemetryEvents(ctx context.Context, req oapi.ReadTelemetryEventsRequestObject) (oapi.ReadTelemetryEventsResponseObject, error) {
log := logger.FromContext(ctx)

if !s.s2Enabled() {
return readTelemetryEventsOKResponse{}, nil
}

result, err := s.telemetryReader.Read(ctx, buildReadOptions(req.Params), log)
if err != nil {
log.Error("failed to read telemetry events from S2", "err", err)
return oapi.ReadTelemetryEvents500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to read telemetry events"}}, nil
}

// has_more / next cursor track the raw stream position, independent of the
// category filter, so a filtered page may come back empty while more remain.
envs := filterByCategory(result.Envelopes, req.Params.Category)

return readTelemetryEventsOKResponse{envs: envs, nextSeqNum: result.NextSeqNum, hasMore: result.HasMore}, nil
}

// buildReadOptions maps query params to a bounded, paginated read. offset is the
// S2 sequence cursor and takes precedence over since as the start position (the
// two are mutually exclusive starts); until bounds the window end; the page is
// bounded to limit records.
func buildReadOptions(p oapi.ReadTelemetryEventsParams) events.ReadOptions {
var opts events.ReadOptions

switch {
case p.Offset != nil && *p.Offset >= 0:
seq := uint64(*p.Offset)
opts.SeqNum = &seq
case p.Since != nil:
ts := uint64(*p.Since)
opts.Timestamp = &ts
default:
ts := uint64(time.Now().Add(-defaultReadWindow).UnixMilli())
opts.Timestamp = &ts
}

if p.Until != nil {
until := uint64(*p.Until)
opts.Until = &until
}

count := uint64(pageSize(p.Limit))
opts.Count = &count
return opts
}

// pageSize clamps a requested limit into [1, maxPageSize], defaulting when unset.
func pageSize(limit *int) int {
switch {
case limit == nil:
return defaultPageSize
case *limit < 1:
return 1
case *limit > maxPageSize:
return maxPageSize
default:
return *limit
}
}

func filterByCategory(envs []events.Envelope, cats *[]oapi.TelemetryEventCategory) []events.Envelope {
if cats == nil || len(*cats) == 0 {
return envs
}
want := make(map[oapi.TelemetryEventCategory]struct{}, len(*cats))
for _, c := range *cats {
want[c] = struct{}{}
}
out := make([]events.Envelope, 0, len(envs))
for _, e := range envs {
if _, ok := want[e.Event.Category]; ok {
out = append(out, e)
}
}
return out
}

// readTelemetryEventsOKResponse serializes a page of events.Envelope directly,
// matching the SSE stream and publish endpoints. The pagination cursor rides in
// the X-Has-More / X-Next-Offset headers (X-Next-Offset only when there is more),
// following the offset_pagination convention used by the list endpoints.
type readTelemetryEventsOKResponse struct {
envs []events.Envelope
nextSeqNum uint64
hasMore bool
}

func (r readTelemetryEventsOKResponse) VisitReadTelemetryEventsResponse(w http.ResponseWriter) error {
w.Header().Set("X-Has-More", strconv.FormatBool(r.hasMore))
if r.hasMore {
w.Header().Set("X-Next-Offset", strconv.FormatUint(r.nextSeqNum, 10))
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
envs := r.envs
if envs == nil {
envs = []events.Envelope{}
}
return json.NewEncoder(w).Encode(envs)
}

// publishTelemetryEventOKResponse serializes events.Envelope directly so the response
// is identical in shape to the SSE stream frames.
type publishTelemetryEventOKResponse struct{ env events.Envelope }
Expand Down
73 changes: 73 additions & 0 deletions server/cmd/api/api/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -296,3 +298,74 @@ func TestStreamResumeAfterLastEventIDUnchanged(t *testing.T) {
id := streamFirstID(t, svc, oapi.StreamTelemetryEventsParams{LastEventID: ptrOf("5")})
assert.Equal(t, uint64(6), id, "Last-Event-ID without replay must behave as before and resume after seq 5")
}

func TestReadTelemetryEventsS2Disabled(t *testing.T) {
t.Parallel()
svc := newTestService(t, newMockRecordManager()) // s2 creds empty -> disabled

resp, err := svc.ReadTelemetryEvents(context.Background(), oapi.ReadTelemetryEventsRequestObject{})
require.NoError(t, err)
ok, isOK := resp.(readTelemetryEventsOKResponse)
require.True(t, isOK, "expected 200 response when S2 is disabled")

rec := httptest.NewRecorder()
require.NoError(t, ok.VisitReadTelemetryEventsResponse(rec))
assert.Equal(t, http.StatusOK, rec.Code)
// Empty result must serialize as [] not null, or the Python SDK chokes.
assert.JSONEq(t, `[]`, rec.Body.String())
assert.Equal(t, "false", rec.Header().Get("X-Has-More"))
assert.Empty(t, rec.Header().Get("X-Next-Offset"), "no cursor when there is no more")
}

func TestFilterByCategory(t *testing.T) {
t.Parallel()
mk := func(c oapi.TelemetryEventCategory) events.Envelope {
return events.Envelope{Event: events.Event{Category: c}}
}
envs := []events.Envelope{mk(events.Console), mk(events.Network), mk(events.Console)}

assert.Len(t, filterByCategory(envs, nil), 3, "nil filter keeps everything")

cats := []oapi.TelemetryEventCategory{events.Console}
assert.Len(t, filterByCategory(envs, &cats), 2)
}

func TestPageSize(t *testing.T) {
t.Parallel()
assert.Equal(t, defaultPageSize, pageSize(nil), "unset defaults")

ptr := func(n int) *int { return &n }
assert.Equal(t, 50, pageSize(ptr(50)))
assert.Equal(t, 1, pageSize(ptr(0)), "clamped up to 1")
assert.Equal(t, 1, pageSize(ptr(-5)), "clamped up to 1")
assert.Equal(t, maxPageSize, pageSize(ptr(5000)), "clamped down to max")
}

func TestBuildReadOptions(t *testing.T) {
t.Parallel()

// No params: defaults the start to ~defaultReadWindow ago, no end bound, default page.
opts := buildReadOptions(oapi.ReadTelemetryEventsParams{})
require.NotNil(t, opts.Timestamp)
assert.Nil(t, opts.SeqNum)
assert.Nil(t, opts.Until)
require.NotNil(t, opts.Count)
assert.Equal(t, uint64(defaultPageSize), *opts.Count)

// since/until/limit map through; limit bounds the page.
since, until, limit := int64(1000), int64(2000), 25
opts = buildReadOptions(oapi.ReadTelemetryEventsParams{Since: &since, Until: &until, Limit: &limit})
require.NotNil(t, opts.Timestamp)
assert.Equal(t, uint64(1000), *opts.Timestamp)
require.NotNil(t, opts.Until)
assert.Equal(t, uint64(2000), *opts.Until)
require.NotNil(t, opts.Count)
assert.Equal(t, uint64(25), *opts.Count)

// offset is the cursor and takes precedence over since (SeqNum start, no Timestamp).
offset := int64(4213)
opts = buildReadOptions(oapi.ReadTelemetryEventsParams{Offset: &offset, Since: &since})
require.NotNil(t, opts.SeqNum)
assert.Equal(t, uint64(4213), *opts.SeqNum)
assert.Nil(t, opts.Timestamp, "since is ignored when offset is set")
}
12 changes: 12 additions & 0 deletions server/cmd/api/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func DisableTelemetryMiddleware() { telemetryMiddlewareEnabled.Store(false) }
// TelemetryMiddlewareEnabled reports the current state.
func TelemetryMiddlewareEnabled() bool { return telemetryMiddlewareEnabled.Load() }

// telemetryReadOps are the telemetry-observer endpoints that must not emit their
// own api_call event: doing so would append to the very stream they read, a
// feedback loop that (e.g. at page size 1) prevents a paginated read from ever
// catching the tail.
var telemetryReadOps = map[string]struct{}{
"ReadTelemetryEvents": {},
"StreamTelemetryEvents": {},
}

// TelemetryHTTPMiddleware emits a BrowserApiCallEvent per documented operation,
// capturing the final status and wall-clock duration. publish is wired to
// TelemetrySession.Publish; the middleware ignores the returns.
Expand All @@ -55,6 +64,9 @@ func TelemetryHTTPMiddleware(publish func(events.Event) (events.Envelope, bool))
if tc.operationID == "" {
return
}
if _, skip := telemetryReadOps[tc.operationID]; skip {
return
}
data, _ := json.Marshal(oapi.BrowserApiCallEventData{
RequestId: chiMiddleware.GetReqID(ctx),
OperationId: tc.operationID,
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (m *mockRecordManager) StopAll(_ context.Context) error
func newTestService(t *testing.T, mgr recorder.RecordManager) *ApiService {
t.Helper()
ts, es := newTelemetrySession(t)
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0)
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), ts, es, 0, nil)
require.NoError(t, err)
svc.cdpMonitor = &stubCdpMonitor{}
return svc
Expand Down
6 changes: 5 additions & 1 deletion server/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func main() {
var s2Writer *events.S2StorageWriter
if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" {
slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream)
s2Writer = events.NewS2StorageWriter(eventStream, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger)
s2Writer = events.NewS2StorageWriter(eventStream, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{
BatcherLinger: config.S2BatcherLinger,
BatcherMaxRecords: config.S2BatcherMaxRecords,
}, slogger)
if err := s2Writer.Start(ctx); err != nil {
slogger.Error("failed to start S2 storage writer", "err", err)
os.Exit(1)
Expand All @@ -134,6 +137,7 @@ func main() {
telemetrySession,
eventStream,
config.DisplayNum,
events.NewS2Reader(config.S2Basin, config.S2AccessToken, config.S2Stream),
)
if err != nil {
slogger.Error("failed to create api service", "err", err)
Expand Down
5 changes: 5 additions & 0 deletions server/cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Config struct {
S2Basin string `envconfig:"S2_BASIN" default:""`
S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""`
S2Stream string `envconfig:"S2_STREAM" default:""`
// S2 batcher tuning for the append path.
S2BatcherLinger time.Duration `envconfig:"S2_BATCHER_LINGER" default:"100ms"`
S2BatcherMaxRecords int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"`
}

// LogValue implements slog.LogValuer, redacting secret fields.
Expand All @@ -73,6 +76,8 @@ func (c *Config) LogValue() slog.Value {
slog.String("s2_basin", c.S2Basin),
slog.String("s2_access_token", s2AccessToken),
slog.String("s2_stream", c.S2Stream),
slog.Duration("s2_batcher_linger", c.S2BatcherLinger),
slog.Int("s2_batcher_max_records", c.S2BatcherMaxRecords),
)
}

Expand Down
8 changes: 8 additions & 0 deletions server/cmd/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func TestLoad(t *testing.T) {
ChromeDriverProxyPort: 9224,
ChromeDriverUpstreamAddr: "127.0.0.1:9225",
DevToolsProxyAddr: "127.0.0.1:9222",
S2BatcherLinger: 100 * time.Millisecond,
S2BatcherMaxRecords: 50,
},
},
{
Expand All @@ -48,6 +50,8 @@ func TestLoad(t *testing.T) {
"SCALE_TO_ZERO_COOLDOWN": "5s",
"CHROMEDRIVER_PROXY_PORT": "5432",
"CHROMEDRIVER_UPSTREAM_ADDR": "127.0.0.1:9999",
"S2_BATCHER_LINGER": "250ms",
"S2_BATCHER_MAX_RECORDS": "100",
},
wantCfg: &Config{
Port: 12345,
Expand All @@ -63,6 +67,8 @@ func TestLoad(t *testing.T) {
ChromeDriverProxyPort: 5432,
ChromeDriverUpstreamAddr: "127.0.0.1:9999",
DevToolsProxyAddr: "127.0.0.1:9876",
S2BatcherLinger: 250 * time.Millisecond,
S2BatcherMaxRecords: 100,
},
},
{
Expand All @@ -85,6 +91,8 @@ func TestLoad(t *testing.T) {
ChromeDriverProxyPort: 9224,
ChromeDriverUpstreamAddr: "127.0.0.1:9225",
DevToolsProxyAddr: "10.0.0.1:1234",
S2BatcherLinger: 100 * time.Millisecond,
S2BatcherMaxRecords: 50,
},
},
{
Expand Down
Loading
Loading