diff --git a/cmd/livepeer/starter/kafka.go b/cmd/livepeer/starter/kafka.go index e6f9d8e27e..5c2250bc4b 100644 --- a/cmd/livepeer/starter/kafka.go +++ b/cmd/livepeer/starter/kafka.go @@ -6,7 +6,7 @@ import ( ) func startKafkaProducer(cfg LivepeerConfig) error { - if *cfg.KafkaBootstrapServers == "" || *cfg.KafkaUsername == "" || *cfg.KafkaPassword == "" || *cfg.KafkaGatewayTopic == "" { + if *cfg.KafkaBootstrapServers == "" || *cfg.KafkaGatewayTopic == "" { glog.Warning("not starting Kafka producer as producer config values aren't present") return nil } diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 22a8540f46..20873e8064 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -1401,7 +1401,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { pricePerUnit = pricePerUnitBase currency = currencyBase glog.Warningf("No 'pricePerUnit' specified for model '%v' in pipeline '%v'. Using default value from `-pricePerUnit`: %v", config.ModelID, config.Pipeline, *cfg.PricePerUnit) - } else if pricePerUnit.Sign() <= 0 { + } else if pricePerUnit.Sign() < 0 { panic(fmt.Errorf("'pricePerUnit' value specified for model '%v' in pipeline '%v' must be a valid positive number, provided %v", config.ModelID, config.Pipeline, config.PricePerUnit)) } diff --git a/monitor/kafka.go b/monitor/kafka.go index bf31d293b4..4629d0a14d 100644 --- a/monitor/kafka.go +++ b/monitor/kafka.go @@ -66,15 +66,20 @@ func InitKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress s func newKafkaProducer(bootstrapServers, user, password, topic, gatewayAddress string) (*KafkaProducer, error) { dialer := &kafka.Dialer{ - Timeout: KafkaRequestTimeout, - SASLMechanism: plain.Mechanism{ - Username: user, - Password: password, - }, + Timeout: KafkaRequestTimeout, DualStack: true, - TLS: &tls.Config{ + } + + if user != "" && password != "" { + tls := &tls.Config{ MinVersion: tls.VersionTLS12, - }, + } + sasl := &plain.Mechanism{ + Username: user, + Password: password, + } + dialer.SASLMechanism = sasl + dialer.TLS = tls } writer := kafka.NewWriter(kafka.WriterConfig{ diff --git a/server/ai_http.go b/server/ai_http.go index b39df225b1..432bb16fe9 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -145,6 +145,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subUrl = pubUrl + "-out" controlUrl = pubUrl + "-control" eventsUrl = pubUrl + "-events" + //dataUrl = pubUrl + "-data" ) // Handle initial payment, the rest of the payments are done separately from the stream processing @@ -180,6 +181,8 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { controlPubCh.CreateChannel() eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json") eventsCh.CreateChannel() + dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/json") + dataCh.CreateChannel() // Start payment receiver which accounts the payments and stops the stream if the payment is insufficient priceInfo := payment.GetExpectedPrice() @@ -235,6 +238,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl) subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl) publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl) + //dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl) workerReq := worker.LiveVideoToVideoParams{ ModelId: req.ModelId, @@ -259,6 +263,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler { subCh.Close() controlPubCh.Close() eventsCh.Close() + dataCh.Close() cancel() respondWithError(w, err.Error(), http.StatusInternalServerError) return diff --git a/server/ai_live_video.go b/server/ai_live_video.go index c69bb510b4..ade0d1eef3 100644 --- a/server/ai_live_video.go +++ b/server/ai_live_video.go @@ -669,6 +669,125 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar }() } +func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) { + // subscribe to the outputs and send them into LPMS + subscriber := trickle.NewTrickleSubscriber(url.String()) + + // Set up output buffers + rbc := media.RingBufferConfig{BufferLen: 50_000_000} // 50 MB buffer + outWriter, err := media.NewRingBuffer(&rbc) + if err != nil { + stopProcessing(ctx, params, fmt.Errorf("ringbuffer init failed: %w", err)) + return + } + + // Store data segments for SSE endpoint + stream := params.liveParams.stream + dataStore := getDataStore(stream) + + // read segments from trickle subscription + go func() { + defer outWriter.Close() + defer removeDataStore(stream) // Clean up when done + + var err error + firstSegment := true + + retries := 0 + // we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length + const retryPause = 300 * time.Millisecond + const maxRetries = 5 + for { + select { + case <-ctx.Done(): + clog.Info(ctx, "trickle subscribe done") + return + default: + } + if !params.inputStreamExists() { + clog.Infof(ctx, "trickle subscribe stopping, input stream does not exist.") + break + } + var segment *http.Response + clog.V(8).Infof(ctx, "trickle subscribe read data await") + segment, err = subscriber.Read() + if err != nil { + if errors.Is(err, trickle.EOS) || errors.Is(err, trickle.StreamNotFoundErr) { + stopProcessing(ctx, params, fmt.Errorf("trickle subscribe stopping, stream not found, err=%w", err)) + return + } + var sequenceNonexistent *trickle.SequenceNonexistent + if errors.As(err, &sequenceNonexistent) { + // stream exists but segment doesn't, so skip to leading edge + subscriber.SetSeq(sequenceNonexistent.Latest) + } + // TODO if not EOS then signal a new orchestrator is needed + err = fmt.Errorf("trickle subscribe error reading: %w", err) + clog.Infof(ctx, "%s", err) + if retries > maxRetries { + stopProcessing(ctx, params, errors.New("trickle subscribe stopping, retries exceeded")) + return + } + retries++ + params.liveParams.sendErrorEvent(err) + time.Sleep(retryPause) + continue + } + retries = 0 + seq := trickle.GetSeq(segment) + clog.V(8).Infof(ctx, "trickle subscribe read data received seq=%d", seq) + copyStartTime := time.Now() + + // Read segment data and store it for SSE + body, err := io.ReadAll(segment.Body) + segment.Body.Close() + if err != nil { + clog.InfofErr(ctx, "trickle subscribe error reading segment body seq=%d", seq, err) + subscriber.SetSeq(seq) + retries++ + continue + } + + // Store the raw segment data for SSE endpoint + dataStore.Store(body) + + // Write to output buffer using the body data + n, err := outWriter.Write(body) + if err != nil { + if errors.Is(err, context.Canceled) { + clog.Info(ctx, "trickle subscribe stopping - context canceled") + return + } + + clog.InfofErr(ctx, "trickle subscribe error writing to output buffer seq=%d", seq, err) + subscriber.SetSeq(seq) + retries++ + continue + } + if firstSegment { + firstSegment = false + delayMs := time.Since(params.liveParams.startTime).Milliseconds() + if monitor.Enabled { + monitor.AIFirstSegmentDelay(delayMs, params.liveParams.sess.OrchestratorInfo) + monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{ + "type": "gateway_receive_first_data_segment", + "timestamp": time.Now().UnixMilli(), + "stream_id": params.liveParams.streamID, + "pipeline_id": params.liveParams.pipelineID, + "request_id": params.liveParams.requestID, + "orchestrator_info": map[string]interface{}{ + "address": params.liveParams.sess.Address(), + "url": params.liveParams.sess.Transcoder(), + }, + }) + } + } + + clog.V(8).Info(ctx, "trickle subscribe read data completed", "seq", seq, "bytes", humanize.Bytes(uint64(n)), "took", time.Since(copyStartTime)) + } + }() +} + func (a aiRequestParams) inputStreamExists() bool { if a.node == nil { return false @@ -705,7 +824,7 @@ const maxInflightSegments = 3 // If inflight max is hit, returns true, false otherwise. func (s *SlowOrchChecker) BeginSegment() (int, bool) { // Returns `false` if there are multiple segments in-flight - // this means the orchestrator is slow reading them + // this means the orchestrator is slow reading // If all-OK, returns `true` s.mu.Lock() defer s.mu.Unlock() diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 4d70635bcc..cfe01bd698 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -14,6 +14,7 @@ import ( "os" "os/exec" "strings" + "sync" "time" "github.com/livepeer/go-livepeer/monitor" @@ -93,6 +94,7 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error { ls.HTTPMux.Handle("POST /live/video-to-video/{stream}/start", ls.StartLiveVideo()) ls.HTTPMux.Handle("POST /live/video-to-video/{prefix}/{stream}/start", ls.StartLiveVideo()) ls.HTTPMux.Handle("POST /live/video-to-video/{stream}/update", ls.UpdateLiveVideo()) + ls.HTTPMux.Handle("OPTIONS /live/video-to-video/{stream}/update", ls.WithCode(http.StatusNoContent)) ls.HTTPMux.Handle("/live/video-to-video/smoketest", ls.SmokeTestLiveVideo()) // Configure WHIP ingest only if an addr is specified. @@ -107,6 +109,9 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error { // Stream status ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus()) + // Stream data SSE endpoint + ls.HTTPMux.Handle("/live/video-to-video/{stream}/data", ls.GetLiveVideoToVideoData()) + //API for dynamic capabilities ls.HTTPMux.Handle("/process/request/", ls.SubmitJob()) @@ -738,16 +743,21 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{ if err != nil { return fmt.Errorf("invalid events URL: %w", err) } + data, err := common.AppendHostname(strings.Replace(*resp.JSON200.EventsUrl, "-events", "-data", 1), host) + if err != nil { + return fmt.Errorf("invalid data URL: %w", err) + } if resp.JSON200.ManifestId != nil { ctx = clog.AddVal(ctx, "manifest_id", *resp.JSON200.ManifestId) params.liveParams.manifestID = *resp.JSON200.ManifestId } - clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events) + clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s data %s", pub, sub, control, events, data) startControlPublish(ctx, control, params) startTricklePublish(ctx, pub, params, params.liveParams.sess) startTrickleSubscribe(ctx, sub, params, params.liveParams.sess) startEventsSubscribe(ctx, events, params, params.liveParams.sess) + startDataSubscribe(ctx, data, params, params.liveParams.sess) return nil } @@ -810,6 +820,9 @@ func (ls *LivepeerServer) UpdateLiveVideo() http.Handler { http.Error(w, err.Error(), http.StatusInternalServerError) return } + + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) }) } @@ -819,6 +832,12 @@ func (ls *LivepeerServer) UpdateLiveVideo() http.Handler { // @Router /live/video-to-video/{stream}/status [get] func (ls *LivepeerServer) GetLiveVideoToVideoStatus() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodOptions { + corsHeaders(w, r.Method) + w.WriteHeader(http.StatusNoContent) + return + } + streamId := r.PathValue("streamId") if streamId == "" { http.Error(w, "stream id is required", http.StatusBadRequest) @@ -1247,3 +1266,157 @@ func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler { }() }) } + +// DataSegmentStore stores data segments for SSE streaming +type DataSegmentStore struct { + streamID string + segments chan []byte + mu sync.RWMutex + closed bool +} + +func NewDataSegmentStore(streamID string) *DataSegmentStore { + return &DataSegmentStore{ + streamID: streamID, + segments: make(chan []byte, 100), // Buffer up to 100 segments + } +} + +func (d *DataSegmentStore) Store(data []byte) { + d.mu.RLock() + defer d.mu.RUnlock() + if d.closed { + return + } + select { + case d.segments <- data: + default: + // Channel is full, drop oldest segment + select { + case <-d.segments: + default: + } + select { + case d.segments <- data: + default: + } + } +} + +func (d *DataSegmentStore) Subscribe() <-chan []byte { + return d.segments +} + +func (d *DataSegmentStore) Close() { + d.mu.Lock() + defer d.mu.Unlock() + if !d.closed { + d.closed = true + close(d.segments) + } +} + +// Global store for data segments by stream ID +var dataStores = make(map[string]*DataSegmentStore) +var dataStoresMu sync.RWMutex + +func getDataStore(stream string) *DataSegmentStore { + dataStoresMu.RLock() + store, exists := dataStores[stream] + dataStoresMu.RUnlock() + if exists { + return store + } + + dataStoresMu.Lock() + defer dataStoresMu.Unlock() + // Double-check after acquiring write lock + if store, exists := dataStores[stream]; exists { + return store + } + + store = NewDataSegmentStore(stream) + dataStores[stream] = store + return store +} + +func removeDataStore(stream string) { + dataStoresMu.Lock() + defer dataStoresMu.Unlock() + if store, exists := dataStores[stream]; exists { + store.Close() + delete(dataStores, stream) + } +} + +// @Summary Get Live Stream Data +// @Param streamId path string true "Stream ID" +// @Success 200 +// @Router /live/video-to-video/{stream}/data [get] +func (ls *LivepeerServer) GetLiveVideoToVideoData() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + stream := r.PathValue("stream") + if stream == "" { + http.Error(w, "stream name is required", http.StatusBadRequest) + return + } + if r.Method == http.MethodOptions { + corsHeaders(w, r.Method) + w.WriteHeader(http.StatusNoContent) + return + } + + ctx := r.Context() + ctx = clog.AddVal(ctx, "stream", stream) + + // Get the data store for this stream + dataStore := getDataStore(stream) + if dataStore == nil { + http.Error(w, "Stream not found", http.StatusNoContent) + return + } + + // Set up SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // Get the subscription channel + dataChan := dataStore.Subscribe() + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + clog.Infof(ctx, "Starting SSE data stream for stream=%s", stream) + + // Send keep-alive ping initially + fmt.Fprintf(w, "event: ping\ndata: {\"type\":\"connected\"}\n\n") + flusher.Flush() + + // Stream data segments as SSE events + for { + select { + case <-ctx.Done(): + clog.Info(ctx, "SSE data stream client disconnected") + return + case data, ok := <-dataChan: + if !ok { + // Channel closed, stream ended + fmt.Fprintf(w, "event: end\ndata: {\"type\":\"stream_ended\"}\n\n") + flusher.Flush() + return + } + + // Send the segment data as a data event + // You might want to encode this differently based on your needs + // For now, we'll send the raw bytes as base64 + fmt.Fprintf(w, "data: \"%s\"\n\n", string(data)) + flusher.Flush() + } + } + }) +}