From e95cd740f9460aef122bbc9d9a581d9bd3ad39fc Mon Sep 17 00:00:00 2001 From: boenshao Date: Fri, 16 Aug 2024 23:02:26 +0800 Subject: [PATCH] perf: chunk and process playback segments concurrently --- internal/playback/on_list.go | 151 ++++++++++++++++++++++++----------- 1 file changed, 106 insertions(+), 45 deletions(-) diff --git a/internal/playback/on_list.go b/internal/playback/on_list.go index 219c171a99d..c955c3afeff 100644 --- a/internal/playback/on_list.go +++ b/internal/playback/on_list.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "os" + "sort" "strconv" "time" @@ -27,6 +28,67 @@ type listEntry struct { Start time.Time `json:"start"` Duration listEntryDuration `json:"duration"` URL string `json:"url"` + init *fmp4.Init +} + +func computeDurationAndConcatenateFMP4(segments []*recordstore.Segment) ([]listEntry, error) { + out := []listEntry{} + var prevInit *fmp4.Init + + for _, seg := range segments { + err := func() error { + f, err := os.Open(seg.Fpath) + if err != nil { + return err + } + defer f.Close() + + init, err := segmentFMP4ReadInit(f) + if err != nil { + return err + } + + _, err = f.Seek(0, io.SeekStart) + if err != nil { + return err + } + + maxDuration, err := segmentFMP4ReadMaxDuration(f, init) + if err != nil { + return err + } + + if len(out) != 0 && segmentFMP4CanBeConcatenated( + prevInit, + out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)), + init, + seg.Start) { + prevStart := out[len(out)-1].Start + curEnd := seg.Start.Add(maxDuration) + out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) + } else { + out = append(out, listEntry{ + Start: seg.Start, + Duration: listEntryDuration(maxDuration), + init: init, + }) + } + + prevInit = init + + return nil + }() + if err != nil { + return nil, err + } + } + + return out, nil +} + +type concatEntryRes struct { + entry []listEntry + err error } func computeDurationAndConcatenate( @@ -34,54 +96,53 @@ func computeDurationAndConcatenate( segments []*recordstore.Segment, ) ([]listEntry, error) { if recordFormat == conf.RecordFormatFMP4 { + const numWorkers = 4 + chunkSize := (len(segments) + numWorkers - 1) / numWorkers + ch := make(chan (*concatEntryRes), numWorkers) + defer close(ch) + + numChunks := 0 + for i := 0; i < len(segments); i += chunkSize { + end := i + chunkSize + if end > len(segments) { + end = len(segments) + } + + numChunks++ + go func(segments []*recordstore.Segment) { + entry, err := computeDurationAndConcatenateFMP4(segments) + ch <- &concatEntryRes{entry: entry, err: err} + }(segments[i:end]) + } + + entries := []listEntry{} + for i := 0; i < numChunks; i++ { + res := <-ch + if res.err != nil { + return nil, res.err + } + entries = append(entries, res.entry...) + } + sort.Slice(entries, func(i, j int) bool { + return entries[i].Start.Before(entries[j].Start) + }) + out := []listEntry{} var prevInit *fmp4.Init - - for _, seg := range segments { - err := func() error { - f, err := os.Open(seg.Fpath) - if err != nil { - return err - } - defer f.Close() - - init, err := segmentFMP4ReadInit(f) - if err != nil { - return err - } - - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return err - } - - maxDuration, err := segmentFMP4ReadMaxDuration(f, init) - if err != nil { - return err - } - - if len(out) != 0 && segmentFMP4CanBeConcatenated( - prevInit, - out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)), - init, - seg.Start) { - prevStart := out[len(out)-1].Start - curEnd := seg.Start.Add(maxDuration) - out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) - } else { - out = append(out, listEntry{ - Start: seg.Start, - Duration: listEntryDuration(maxDuration), - }) - } - - prevInit = init - - return nil - }() - if err != nil { - return nil, err + for _, entry := range entries { + if len(out) != 0 && segmentFMP4CanBeConcatenated( + prevInit, + out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)), + entry.init, + entry.Start) { + prevStart := out[len(out)-1].Start + curEnd := entry.Start.Add(time.Duration(entry.Duration)) + out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) + } else { + out = append(out, entry) } + + prevInit = entry.init } return out, nil