-
Notifications
You must be signed in to change notification settings - Fork 0
add statsutil, callback manager and streaming media utilities #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ault
* callbacks: go1.23 does not allow type aliases with generics
util/callback/callbacks.go:16:6: generic type alias requires GOEXPERIMENT=aliastypeparams
* ifutil.FirstOrDefault: do not check array elements for "zero" anymore. That check had unexpected consequences, i.e. FirstOrDefault([]bool{false}, true) returned true, whereas the expected result is false.
plus * include latest errors-go, log-go, stretchr/testify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds comprehensive media streaming utilities and statistics infrastructure to support MPEG-TS, RTP, and TLV protocols. The changes introduce packet handling, pacing, validation, and monitoring capabilities extracted from content-fabric code.
Key changes:
- New statistics collection framework (
statsutil) for periodic and aggregate metrics - Generic callback registry system for concurrent-safe event notification
- Complete media protocol support for MPEG-TS, RTP, and TLV with packetizers, pacers, and validators
- Media I/O abstraction layer supporting UDP, RTP, SRT, and file sources/sinks
- Go 1.23 upgrade with updated dependencies (testify, log-go, errors-go)
Reviewed changes
Copilot reviewed 56 out of 57 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| util/statsutil/* | Statistics collection utilities for tracking count, min, max, sum, mean with periodic aggregation |
| util/callback/* | Thread-safe callback manager with async dispatch using channels |
| util/byteutil/ringbuffer.go | Ring buffer implementation using double-sized buffer to avoid modulo logic |
| util/ifutil/ifutil.go | Modified FirstOrDefault to return first element regardless of zero value |
| util/testutil/assets.go | Test asset path resolution with environment variable support |
| media/rtp/* | RTP packet handling including packetizer, pacer, gap detector, and timestamp unwrapper |
| media/mpegts/* | MPEG-TS stream tracking, validation, pacing, and monitoring |
| media/tlv/* | TLV packetizer and decapsulator for type-length-value encoding |
| media/io/* | Abstraction for packet sources/sinks supporting UDP, SRT, and file protocols |
| media/noop.go | No-op implementations of media interfaces for testing |
| media/transformer.go | Transformer chaining utility |
| format/util.go | QID extraction from hash/ID/token strings |
| format/duration/rounded.go | Duration type that formats with single decimal precision |
| format/structured/copy_test.go | Custom requireNotSame helper for testify compatibility |
| go.mod | Go 1.23 upgrade and dependency updates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
dcae4bf to
e0a0e8e
Compare
e0a0e8e to
691438f
Compare
* some restructuring in media/io * basic unit tests for media/io
elv-gilles
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the media package should go into another module: common-go is used from modules that do not deal with media.
|
|
||
| require ( | ||
| github.com/Comcast/gots/v2 v2.2.1 | ||
| github.com/HdrHistogram/hdrhistogram-go v1.2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fabric is currently using a replacement for this library pointing to a fork in qluvio. I believe the fork was made in order to include some fix.
github.com/qluvio/hdrhistogram-go v1.1.1-0.20210518163530-2bc0df1bbb91
Please clarify which one should be used.
| Max T `json:"max"` | ||
| Sum T `json:"sum"` | ||
| Mean float64 `json:"mean"` | ||
| m2 float64 // used for mean calc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m2 would be used for variance calculation, it is updated but never used.
| default: | ||
| err = errors.E("createPacketSink", errors.K.Invalid, | ||
| "sink", sinkUrl, | ||
| "reason", "unsupported protocol, expecting udp|rtp|srt", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "reason", "unsupported protocol, expecting udp|rtp|srt", | |
| "reason", "unsupported protocol, expecting udp|rtp|srt|file", |
| // ensures message boundaries of the sender are preserved | ||
| srtConfig.MessageAPI = true | ||
|
|
||
| if false { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please, add comment
| // log.Warn("srt connect error", err) | ||
| wc := io.WriteCloser(&ErrorWriter{err: errors.E("srt connect error", errors.K.Invalid.Default(), err)}) | ||
| dw.writer.Store(&wc) | ||
| time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be configurable ?
| import "github.com/eluv-io/errors-go" | ||
|
|
||
| // NewTlvDecapsulator creates a new decapsulator for TLV payloads. | ||
| func NewTlvDecapsulator() *Decapsulator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem to be used
b598851 to
bed49ff
Compare
This PR is a combination of multiple, independent additions:
Statistics Utilities
/util/statsutilpackage withStatistics[T Number]: a utility for collecting statistics of a given measurement that calculates count, min, max, sum, mean and standard deviation.Periodic[T Number]: a utility for collecting periodic statistics, including current period, last period and aggregateMedia Protocol and Format Support:
Parsers, packetizers, pacers & monitors for MPEG-TS, RTP and TLV (type, length, value). Extracted and adapted from content-fabric code.
A new package
/media/iofor handling media packet I/O sources and sinks, supporting multiple protocols (UDP, RTP, SRT, and file).Utility Functions and Testing:
ExtractQIDutility informat/util.goto extract content IDs from various string formats, with comprehensive tests informat/util_test.go.format/structured/copy_test.goby replacingrequire.NotSamewith a customrequireNotSameto handle non-pointer comparisons, addressing breaking changes in the testify library.Dependency and Module Updates:
go.mod, includingtestify,log-go,errors-go, and added new dependencies for SRT and related protocols.Other:
Roundedinformat/duration/rounded.gofor duration formatting.