From 95e39d3b9f863e667cf5757f967ccd4bfc26be57 Mon Sep 17 00:00:00 2001 From: manishiitg Date: Mon, 21 Jun 2021 07:24:17 +0200 Subject: [PATCH] fixes --- loadtest/client/gst/compositor.go | 2 +- loadtest/client/producer.go | 5 +- loadtest/gstreamer-src/gst.c | 86 +++++++++++++++++++++ loadtest/gstreamer-src/gst.go | 123 ++++++++++++++++++++++++++++++ loadtest/gstreamer-src/gst.h | 16 ++++ loadtest/load.go | 79 ++++++++++++++++++- tracktodisk/cloudwriter.go | 1 + tracktodisk/disk.go | 10 +-- 8 files changed, 309 insertions(+), 13 deletions(-) create mode 100644 loadtest/gstreamer-src/gst.c create mode 100644 loadtest/gstreamer-src/gst.go create mode 100644 loadtest/gstreamer-src/gst.h diff --git a/loadtest/client/gst/compositor.go b/loadtest/client/gst/compositor.go index 20efb1d..261f3d6 100644 --- a/loadtest/client/gst/compositor.go +++ b/loadtest/client/gst/compositor.go @@ -32,7 +32,7 @@ type CompositorPipeline struct { func NewCompositorPipeline(extraPipelineStr string) *CompositorPipeline { pipelineStr := ` compositor name=vmix background=black ! video/x-raw,width=1920,height=1080,framerate=30/1,format=UYVY ! queue ! tee name=vtee - vtee. ! queue ! autovideosink sync=false + vtee. ! queue ! glimagesink sync=false audiomixer name=amix ! queue ! tee name=atee atee. ! queue ! audioconvert ! autoaudiosink ` + extraPipelineStr diff --git a/loadtest/client/producer.go b/loadtest/client/producer.go index b3d4843..907c184 100644 --- a/loadtest/client/producer.go +++ b/loadtest/client/producer.go @@ -30,12 +30,12 @@ type GSTProducer struct { // NewGSTProducer will create a new producer for a given client and a videoFile func NewGSTProducer(kind string, path string) *GSTProducer { stream := fmt.Sprintf("gst-%v-%v", kind, cuid.New()) - videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, cuid.New(), stream) + videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264", ClockRate: 90000}, cuid.New(), stream) if err != nil { panic(err) } - audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, cuid.New(), stream) + audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "audio/opus", ClockRate: 48000}, cuid.New(), stream) if err != nil { panic(err) } @@ -83,7 +83,6 @@ func (t *GSTProducer) Pause(pause bool) { //Stop the pipeline func (t *GSTProducer) Stop() { - t.pipeline.Pause() } //Start the pipeline diff --git a/loadtest/gstreamer-src/gst.c b/loadtest/gstreamer-src/gst.c new file mode 100644 index 0000000..f91c393 --- /dev/null +++ b/loadtest/gstreamer-src/gst.c @@ -0,0 +1,86 @@ +#include "gst.h" + +#include + +typedef struct SampleHandlerUserData { + int pipelineId; +} SampleHandlerUserData; + +GMainLoop *gstreamer_send_main_loop = NULL; +void gstreamer_send_start_mainloop(void) { + gstreamer_send_main_loop = g_main_loop_new(NULL, FALSE); + + g_main_loop_run(gstreamer_send_main_loop); +} + +static gboolean gstreamer_send_bus_call(GstBus *bus, GstMessage *msg, gpointer data) { + switch (GST_MESSAGE_TYPE(msg)) { + + case GST_MESSAGE_EOS: + g_print("End of stream\n"); + exit(1); + break; + + case GST_MESSAGE_ERROR: { + gchar *debug; + GError *error; + + gst_message_parse_error(msg, &error, &debug); + g_free(debug); + + g_printerr("Error: %s\n", error->message); + g_error_free(error); + exit(1); + } + default: + break; + } + + return TRUE; +} + +GstFlowReturn gstreamer_send_new_sample_handler(GstElement *object, gpointer user_data) { + GstSample *sample = NULL; + GstBuffer *buffer = NULL; + gpointer copy = NULL; + gsize copy_size = 0; + SampleHandlerUserData *s = (SampleHandlerUserData *)user_data; + + g_signal_emit_by_name (object, "pull-sample", &sample); + if (sample) { + buffer = gst_sample_get_buffer(sample); + if (buffer) { + gst_buffer_extract_dup(buffer, 0, gst_buffer_get_size(buffer), ©, ©_size); + goHandlePipelineBuffer(copy, copy_size, GST_BUFFER_DURATION(buffer), s->pipelineId); + } + gst_sample_unref (sample); + } + + return GST_FLOW_OK; +} + +GstElement *gstreamer_send_create_pipeline(char *pipeline) { + gst_init(NULL, NULL); + GError *error = NULL; + return gst_parse_launch(pipeline, &error); +} + +void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId) { + SampleHandlerUserData *s = calloc(1, sizeof(SampleHandlerUserData)); + s->pipelineId = pipelineId; + + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline)); + gst_bus_add_watch(bus, gstreamer_send_bus_call, NULL); + gst_object_unref(bus); + + GstElement *appsink = gst_bin_get_by_name(GST_BIN(pipeline), "appsink"); + g_object_set(appsink, "emit-signals", TRUE, NULL); + g_signal_connect(appsink, "new-sample", G_CALLBACK(gstreamer_send_new_sample_handler), s); + gst_object_unref(appsink); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); +} + +void gstreamer_send_stop_pipeline(GstElement *pipeline) { + gst_element_set_state(pipeline, GST_STATE_NULL); +} diff --git a/loadtest/gstreamer-src/gst.go b/loadtest/gstreamer-src/gst.go new file mode 100644 index 0000000..8ff90f2 --- /dev/null +++ b/loadtest/gstreamer-src/gst.go @@ -0,0 +1,123 @@ +// Package gst provides an easy API to create an appsink pipeline +package gstreamergst + +/* +#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0 +#include "gst.h" +*/ +import "C" +import ( + "fmt" + "sync" + "time" + "unsafe" + + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" +) + +func init() { + go C.gstreamer_send_start_mainloop() +} + +// Pipeline is a wrapper for a GStreamer Pipeline +type Pipeline struct { + Pipeline *C.GstElement + tracks []*webrtc.TrackLocalStaticSample + id int + codecName string + clockRate float32 +} + +var pipelines = make(map[int]*Pipeline) +var pipelinesLock sync.Mutex + +const ( + videoClockRate = 90000 + audioClockRate = 48000 + pcmClockRate = 8000 +) + +// CreatePipeline creates a GStreamer Pipeline +func CreatePipeline(codecName string, tracks []*webrtc.TrackLocalStaticSample, pipelineSrc string) *Pipeline { + pipelineStr := "appsink name=appsink" + var clockRate float32 + + switch codecName { + case "vp8": + pipelineStr = pipelineSrc + " ! vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=true cpu-used=5 deadline=1 ! " + pipelineStr + clockRate = videoClockRate + + case "vp9": + pipelineStr = pipelineSrc + " ! vp9enc ! " + pipelineStr + clockRate = videoClockRate + + case "h264": + pipelineStr = pipelineSrc + " ! video/x-raw,format=I420 ! x264enc speed-preset=ultrafast tune=zerolatency key-int-max=20 ! video/x-h264,stream-format=byte-stream ! " + pipelineStr + clockRate = videoClockRate + + case "opus": + pipelineStr = pipelineSrc + " ! opusenc ! " + pipelineStr + clockRate = audioClockRate + + case "g722": + pipelineStr = pipelineSrc + " ! avenc_g722 ! " + pipelineStr + clockRate = audioClockRate + + case "pcmu": + pipelineStr = pipelineSrc + " ! audio/x-raw, rate=8000 ! mulawenc ! " + pipelineStr + clockRate = pcmClockRate + + case "pcma": + pipelineStr = pipelineSrc + " ! audio/x-raw, rate=8000 ! alawenc ! " + pipelineStr + clockRate = pcmClockRate + + default: + panic("Unhandled codec " + codecName) + } + + pipelineStrUnsafe := C.CString(pipelineStr) + defer C.free(unsafe.Pointer(pipelineStrUnsafe)) + + pipelinesLock.Lock() + defer pipelinesLock.Unlock() + + pipeline := &Pipeline{ + Pipeline: C.gstreamer_send_create_pipeline(pipelineStrUnsafe), + tracks: tracks, + id: len(pipelines), + codecName: codecName, + clockRate: clockRate, + } + + pipelines[pipeline.id] = pipeline + return pipeline +} + +// Start starts the GStreamer Pipeline +func (p *Pipeline) Start() { + C.gstreamer_send_start_pipeline(p.Pipeline, C.int(p.id)) +} + +// Stop stops the GStreamer Pipeline +func (p *Pipeline) Stop() { + C.gstreamer_send_stop_pipeline(p.Pipeline) +} + +//export goHandlePipelineBuffer +func goHandlePipelineBuffer(buffer unsafe.Pointer, bufferLen C.int, duration C.int, pipelineID C.int) { + pipelinesLock.Lock() + pipeline, ok := pipelines[int(pipelineID)] + pipelinesLock.Unlock() + + if ok { + for _, t := range pipeline.tracks { + if err := t.WriteSample(media.Sample{Data: C.GoBytes(buffer, bufferLen), Duration: time.Duration(duration)}); err != nil { + panic(err) + } + } + } else { + fmt.Printf("discarding buffer, no pipeline with id %d", int(pipelineID)) + } + C.free(buffer) +} diff --git a/loadtest/gstreamer-src/gst.h b/loadtest/gstreamer-src/gst.h new file mode 100644 index 0000000..d62a14d --- /dev/null +++ b/loadtest/gstreamer-src/gst.h @@ -0,0 +1,16 @@ +// #ifndef GST_H +// #define GST_H + +// #include +// #include +// #include +// #include + +// extern void goHandlePipelineBuffer(void *buffer, int bufferLen, int samples, int pipelineId); + +// GstElement *gstreamer_send_create_pipeline(char *pipeline); +// void gstreamer_send_start_pipeline(GstElement *pipeline, int pipelineId); +// void gstreamer_send_stop_pipeline(GstElement *pipeline); +// void gstreamer_send_start_mainloop(void); + +// #endif \ No newline at end of file diff --git a/loadtest/load.go b/loadtest/load.go index addcafd..526f9b1 100644 --- a/loadtest/load.go +++ b/loadtest/load.go @@ -53,10 +53,9 @@ func getFileByType(file string) string { if file == "h264" { //TODO not working as of now need to debug // load is there but doesn't play on browser. track should play on browser also - // issue is on browser side - filepath = "/var/tmp/Big_Buck_Bunny_720_10s_1MB.mp4" + filepath = "/var/tmp/Jellyfish_360_10s_1MB.mp4" if _, err := os.Stat(filepath); os.IsNotExist(err) { - err := util.DownloadFile(filepath, "https://test-videos.co.uk/vids/bigbuckbunny/mp4/h264/720/Big_Buck_Bunny_720_10s_1MB.mp4") + err := util.DownloadFile(filepath, "https://test-videos.co.uk/vids/jellyfish/mp4/h264/360/Jellyfish_360_10s_1MB.mp4") if err != nil { log.Infof("error downloading file %v", err) filepath = "test" @@ -139,6 +138,57 @@ func run(e *sdk.Engine, addr, session, file, role string, total, duration, cycle util.HandleDataChannel(c, "loadtest", i, cid) if !strings.Contains(file, ".webm") { + + // if file == "test2" { + + // audioSrc := "audiotestsrc" + // videoSrc := "videotestsrc" + + // videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/vp8"}, "video", "pion2") + // if err != nil { + // panic(err) + // } + + // // _, err = peerConnection.AddTrack(videoTrack) + // // if err != nil { + // // panic(err) + // // } + + // audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "audio/opus"}, "audio", "pion1") + // if err != nil { + // panic(err) + // } + // // _, err = peerConnection.AddTrack(audioTrack) + // // if err != nil { + // // panic(err) + // // } + + // // client join a session + // c.Join(new_session, nil) + // defer e.DelClient(c) + + // t, _ := c.GetPubTransport().GetPeerConnection().AddTransceiverFromTrack(audioTrack, webrtc.RTPTransceiverInit{ + // Direction: webrtc.RTPTransceiverDirectionSendonly, + // }) + // defer c.UnPublish(t) + + // t2, _ := c.GetPubTransport().GetPeerConnection().AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{ + // Direction: webrtc.RTPTransceiverDirectionSendonly, + // }) + // defer c.UnPublish(t2) + // c.OnNegotiationNeeded() + + // if err != nil { + // log.Errorf("join err=%v", err) + // panic(err) + // } + + // // Start pushing buffers on these tracks + // gstreamergst.CreatePipeline("opus", []*webrtc.TrackLocalStaticSample{audioTrack}, audioSrc).Start() + // gstreamergst.CreatePipeline("vp8", []*webrtc.TrackLocalStaticSample{videoTrack}, videoSrc).Start() + + // } else { + //this stopped working. need to debug //i.e video is not showing on webapp c.Join(new_session, nil) @@ -162,6 +212,26 @@ func run(e *sdk.Engine, addr, session, file, role string, total, duration, cycle defer c.UnPublish(t2) c.OnNegotiationNeeded() + go func() { + rtcpBuf := make([]byte, 1500) + for { + if _, _, rtcpErr := t2.Sender().Read(rtcpBuf); rtcpErr != nil { + log.Infof("videoSender rtcp error %v", err) + return + } + } + }() + + go func() { + rtcpBuf := make([]byte, 1500) + for { + if _, _, rtcpErr := t.Sender().Read(rtcpBuf); rtcpErr != nil { + log.Infof("audioSender rtcp error %v", err) + return + } + } + }() + go func() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() @@ -180,7 +250,7 @@ func run(e *sdk.Engine, addr, session, file, role string, total, duration, cycle } }() - producer.Start() + go producer.Start() defer producer.Stop() defer func() { @@ -188,6 +258,7 @@ func run(e *sdk.Engine, addr, session, file, role string, total, duration, cycle }() log.Infof("tracks published") + // } } else { c.Join(new_session, nil) diff --git a/tracktodisk/cloudwriter.go b/tracktodisk/cloudwriter.go index f1a4ca6..c30ea57 100644 --- a/tracktodisk/cloudwriter.go +++ b/tracktodisk/cloudwriter.go @@ -43,6 +43,7 @@ func NewCloudFileWriter(path string) *FileWriter { log.Errorf("Bucket(%v).Create: %v", bucketName, err) } + util.UpdateMeta(bucketName + "/" + path) w := bucket.Object(path).NewWriter(ctx) fw.wr = w fw.client = client diff --git a/tracktodisk/disk.go b/tracktodisk/disk.go index ee5f1b8..beab1e2 100644 --- a/tracktodisk/disk.go +++ b/tracktodisk/disk.go @@ -148,12 +148,12 @@ func tracktodisk(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver, saver maxTimeLate := time.Millisecond * time.Duration(0) builder := avp.MustBuilder(avp.NewBuilder(track, maxPacketsLate, avp.WithMaxLateTime(maxTimeLate))) - if track.Kind() == webrtc.RTPCodecTypeVideo { - err := client.GetSubTransport().GetPeerConnection().WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC: uint32(track.SSRC()), MediaSSRC: uint32(track.SSRC())}}) - if err != nil { - log.Errorf("error writing pli %s", err) - } + // if track.Kind() == webrtc.RTPCodecTypeVideo { + err := client.GetSubTransport().GetPeerConnection().WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC: uint32(track.SSRC()), MediaSSRC: uint32(track.SSRC())}}) + if err != nil { + log.Errorf("error writing pli %s", err) } + // } diskTrackMap[track.ID()] = track.Kind().String() builder.AttachElement(saver) go pliLoop(client, track, 1000)