Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SmartPtr: Support load test for source by srs-bench. v6.0.130 #4097

Merged
merged 11 commits into from
Jun 20, 2024
Prev Previous commit
Next Next commit
SmartPtr: Support RTC reconnect load test.
  • Loading branch information
winlinvip committed Jun 19, 2024
commit 15a94279a9e3bae491e56a07573c2fe71029f363
9 changes: 9 additions & 0 deletions trunk/3rdparty/srs-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ for ((i=0;;i++)); do
done
```

WebRTC重连测试:

```bash
for ((i=0;;i++)); do
./objs/srs_bench -sfu=rtc -pr=webrtc://localhost/live${i}/livestream -sn=1000 -cap=true;
sleep 10;
done
```

## Regression Test

回归测试需要先启动[SRS](https://github.com/ossrs/srs/issues/307),支持WebRTC推拉流:
Expand Down
21 changes: 18 additions & 3 deletions trunk/3rdparty/srs-bench/srs/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go
func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error {
func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC, closeAfterPublished bool) error {
ctx = logger.WithContext(ctx)

logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
Expand Down Expand Up @@ -77,10 +77,13 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
return nil, err
}

if sourceAudio != "" {
// For CAP, we always add audio track, because both audio and video are disabled for CAP, which will
// cause failed when exchange SDP.
if sourceAudio != "" || closeAfterPublished {
aIngester = newAudioIngester(sourceAudio)
registry.Add(&rtpInteceptorFactory{aIngester.audioLevelInterceptor})
}

if sourceVideo != "" {
vIngester = newVideoIngester(sourceVideo)
registry.Add(&rtpInteceptorFactory{vIngester.markerInterceptor})
Expand Down Expand Up @@ -178,6 +181,7 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i

// Wait for event from context or tracks.
var wg sync.WaitGroup
defer wg.Wait()

wg.Add(1)
go func() {
Expand All @@ -186,6 +190,18 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
doClose() // Interrupt the RTCP read.
}()

// If CAP, directly close the connection after published.
if closeAfterPublished {
select {
case <-ctx.Done():
case <-pcDoneCtx.Done():
}

logger.Tf(ctx, "Close connection after published")
cancel()
return nil
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -295,6 +311,5 @@ func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
}
}()

wg.Wait()
return nil
}
13 changes: 9 additions & 4 deletions trunk/3rdparty/srs-bench/srs/srs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var clients, streams, delay int

var statListen string

var closeAfterPublished bool

func Parse(ctx context.Context) {
fl := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)

Expand All @@ -71,6 +73,8 @@ func Parse(ctx context.Context) {

fl.StringVar(&statListen, "stat", "", "")

fl.BoolVar(&closeAfterPublished, "cap", false, "")

fl.Usage = func() {
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
fmt.Println(fmt.Sprintf("Options:"))
Expand All @@ -95,6 +99,7 @@ func Parse(ctx context.Context) {
fmt.Println(fmt.Sprintf(" -fps [Optional] The fps of .h264 source file."))
fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty."))
fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty."))
fmt.Println(fmt.Sprintf(" -cap Whether to close connection after publish. Default: false"))
fmt.Println(fmt.Sprintf("\n例如,1个播放,1个推流:"))
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
Expand All @@ -118,7 +123,7 @@ func Parse(ctx context.Context) {
if sr == "" && pr == "" {
showHelp = true
}
if pr != "" && (sourceAudio == "" && sourceVideo == "") {
if pr != "" && !closeAfterPublished && (sourceAudio == "" && sourceVideo == "") {
showHelp = true
}
if showHelp {
Expand All @@ -135,8 +140,8 @@ func Parse(ctx context.Context) {
summaryDesc = fmt.Sprintf("%v, play(url=%v, da=%v, dv=%v, pli=%v)", summaryDesc, sr, dumpAudio, dumpVideo, pli)
}
if pr != "" {
summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v)",
summaryDesc, pr, sourceAudio, sourceVideo, fps)
summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v, cap=%v)",
summaryDesc, pr, sourceAudio, sourceVideo, fps, closeAfterPublished)
}
logger.Tf(ctx, "Run benchmark with %v", summaryDesc)

Expand Down Expand Up @@ -271,7 +276,7 @@ func Run(ctx context.Context) error {
gStatRTC.Publishers.Alive--
}()

if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC); err != nil {
if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC, closeAfterPublished); err != nil {
if errors.Cause(err) != context.Canceled {
logger.Wf(ctx, "Run err %+v", err)
}
Expand Down
91 changes: 75 additions & 16 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ using namespace std;
const int kRtpMaxPayloadSize = kRtpPacketSize - 300;
#endif

// the time to cleanup source.
#define SRS_RTC_SOURCE_CLEANUP (3 * SRS_UTIME_SECONDS)

// TODO: Add this function into SrsRtpMux class.
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf)
{
Expand Down Expand Up @@ -244,11 +247,56 @@ void SrsRtcConsumer::on_stream_change(SrsRtcSourceDescription* desc)
SrsRtcSourceManager::SrsRtcSourceManager()
{
lock = srs_mutex_new();
timer_ = new SrsHourGlass("sources", this, 1 * SRS_UTIME_SECONDS);
}

SrsRtcSourceManager::~SrsRtcSourceManager()
{
srs_mutex_destroy(lock);
srs_freep(timer_);
}

srs_error_t SrsRtcSourceManager::initialize()
{
return setup_ticks();
}

srs_error_t SrsRtcSourceManager::setup_ticks()
{
srs_error_t err = srs_success;

if ((err = timer_->tick(1, 3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "tick");
}

if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "timer");
}

return err;
}

srs_error_t SrsRtcSourceManager::notify(int event, srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;

std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it;
for (it = pool.begin(); it != pool.end();) {
SrsSharedPtr<SrsRtcSource>& source = it->second;

// When source expired, remove it.
// @see https://github.com/ossrs/srs/issues/713
if (source->stream_is_dead()) {
SrsContextId cid = source->source_id();
if (cid.empty()) cid = source->pre_source_id();
srs_trace("RTC: cleanup die source, id=[%s], total=%d", cid.c_str(), (int)pool.size());
pool.erase(it++);
} else {
++it;
}
}

return err;
}

srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr<SrsRtcSource>& pps)
Expand Down Expand Up @@ -305,19 +353,6 @@ SrsSharedPtr<SrsRtcSource> SrsRtcSourceManager::fetch(SrsRequest* r)
return source;
}

void SrsRtcSourceManager::eliminate(SrsRequest* r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);

string stream_url = r->get_stream_url();
std::map< std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);
if (it != pool.end()) {
pool.erase(it);
}
}

SrsRtcSourceManager* _srs_rtc_sources = NULL;

ISrsRtcPublishStream::ISrsRtcPublishStream()
Expand Down Expand Up @@ -351,6 +386,7 @@ SrsRtcSource::SrsRtcSource()
#endif

pli_for_rtmp_ = pli_elapsed_ = 0;
stream_die_at_ = 0;
}

SrsRtcSource::~SrsRtcSource()
Expand Down Expand Up @@ -384,6 +420,27 @@ srs_error_t SrsRtcSource::initialize(SrsRequest* r)
return err;
}

bool SrsRtcSource::stream_is_dead()
{
// still publishing?
if (is_created_) {
return false;
}

// has any consumers?
if (!consumers.empty()) {
return false;
}

// Delay cleanup source.
srs_utime_t now = srs_get_system_time();
if (now < stream_die_at_ + SRS_RTC_SOURCE_CLEANUP) {
return false;
}

return true;
}

void SrsRtcSource::init_for_play_before_publishing()
{
// If the stream description has already been setup by RTC publisher,
Expand Down Expand Up @@ -497,6 +554,8 @@ srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer)
consumer = new SrsRtcConsumer(this);
consumers.push_back(consumer);

stream_die_at_ = 0;

// TODO: FIXME: Implements edge cluster.

return err;
Expand Down Expand Up @@ -530,7 +589,7 @@ void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer)

// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
stream_die_at_ = srs_get_system_time();
}
}

Expand Down Expand Up @@ -633,8 +692,8 @@ void SrsRtcSource::on_unpublish()
stat->on_stream_close(req);

// Destroy and cleanup source when no publishers and consumers.
if (!is_created_ && consumers.empty()) {
_srs_rtc_sources->eliminate(req);
if (consumers.empty()) {
stream_die_at_ = srs_get_system_time();
}
}

Expand Down
18 changes: 14 additions & 4 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,21 @@ class SrsRtcConsumer
void on_stream_change(SrsRtcSourceDescription* desc);
};

class SrsRtcSourceManager
class SrsRtcSourceManager : public ISrsHourGlass
{
private:
srs_mutex_t lock;
std::map< std::string, SrsSharedPtr<SrsRtcSource> > pool;
SrsHourGlass* timer_;
public:
SrsRtcSourceManager();
virtual ~SrsRtcSourceManager();
public:
virtual srs_error_t initialize();
// interface ISrsHourGlass
private:
virtual srs_error_t setup_ticks();
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
public:
// create source when fetch from cache failed.
// @param r the client request.
Expand All @@ -127,9 +134,6 @@ class SrsRtcSourceManager
public:
// Get the exists source, NULL when not exists.
virtual SrsSharedPtr<SrsRtcSource> fetch(SrsRequest* r);
public:
// Dispose and destroy the source.
virtual void eliminate(SrsRequest* r);
};

// Global singleton instance.
Expand Down Expand Up @@ -195,11 +199,17 @@ class SrsRtcSource : public ISrsFastTimer
// The PLI for RTC2RTMP.
srs_utime_t pli_for_rtmp_;
srs_utime_t pli_elapsed_;
private:
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;
public:
SrsRtcSource();
virtual ~SrsRtcSource();
public:
virtual srs_error_t initialize(SrsRequest* r);
public:
// Whether stream is dead, which is no publisher or player.
virtual bool stream_is_dead();
private:
void init_for_play_before_publishing();
public:
Expand Down
11 changes: 9 additions & 2 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ using namespace std;
#ifdef SRS_RTC
#include <srs_app_rtc_network.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_rtc_source.hpp>
#endif
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
Expand Down Expand Up @@ -809,12 +810,18 @@ srs_error_t SrsServer::start(SrsWaitGroup* wg)
srs_error_t err = srs_success;

if ((err = _srs_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
return srs_error_wrap(err, "live sources");
}

#ifdef SRS_SRT
if ((err = _srs_srt_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "sources");
return srs_error_wrap(err, "srt sources");
}
#endif

#ifdef SRS_RTC
if ((err = _srs_rtc_sources->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc sources");
}
#endif

Expand Down
Loading